Skip to main content

hashiverse_lib/client/post_bundle/
live_post_bundle_manager.rs

1//! # Production [`PostBundleManager`] — cache, fetch, heal
2//!
3//! The real implementation of
4//! [`crate::client::post_bundle::post_bundle_manager::PostBundleManager`] used in
5//! production. Lookup proceeds in three stages:
6//!
7//! 1. **Cache** — check `BUCKET_POST_BUNDLE` in
8//!    [`crate::client::client_storage::client_storage::ClientStorage`]. Serve hot entries
9//!    immediately; serve stale entries only if the bundle is already sealed.
10//! 2. **Network** — otherwise walk peers closest to the bucket's location id via a
11//!    [`crate::client::peer_tracker::peer_iterator::PeerIterator`] and issue
12//!    `GetPostBundleV1` RPCs, gated by the
13//!    [`crate::client::caching::cache_radius_tracker`] to avoid re-hammering
14//!    already-cached peers. Concurrent lookups for the same `(location, time)` are
15//!    de-duplicated via a per-key `Mutex` so only one RPC actually goes out.
16//! 3. **Heal** — if multiple peers return divergent bundles, spawn
17//!    [`crate::client::post_bundle::post_bundle_healing`] in the background to reconcile
18//!    them.
19
20use crate::anyhow_assert_eq;
21use crate::client::caching::cache_radius_tracker::CacheRadiusTracker;
22use crate::client::caching::post_bundle_cache_uploader;
23use crate::client::client_storage::client_storage::{ClientStorage, BUCKET_POST_BUNDLE};
24use crate::client::peer_tracker::peer_tracker::PeerTracker;
25use crate::client::post_bundle::post_bundle_healing;
26use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
27use crate::protocol::payload::payload::{CacheRequestTokenV1, GetPostBundleResponseV1, GetPostBundleV1, PayloadRequestKind, PayloadResponseKind};
28use crate::protocol::posting::encoded_post_bundle::EncodedPostBundleV1;
29use crate::protocol::rpc;
30use crate::tools::buckets::BucketLocation;
31use crate::tools::config::CLIENT_POST_BUNDLE_CACHE_DURATION;
32use crate::tools::runtime_services::RuntimeServices;
33use crate::tools::time::TimeMillis;
34use crate::tools::tools::LeadingAgreementBits;
35use crate::tools::types::Id;
36use crate::tools::{config, json};
37use bytes::Bytes;
38use log::{info, trace, warn};
39use scopeguard::defer;
40use std::collections::HashMap;
41use std::sync::Arc;
42use tokio::sync::{Mutex, RwLock};
43
44/// The production [`PostBundleManager`] implementation — the one that actually talks to the
45/// network.
46///
47/// `LivePostBundleManager` is the glue between the pure-logic timeline code and the live
48/// network. Each `get_post_bundle` call:
49///
50/// 1. Checks [`ClientStorage`] under `BUCKET_POST_BUNDLE` for a sufficiently fresh cached
51///    copy (freshness is governed by `CLIENT_POST_BUNDLE_CACHE_DURATION`).
52/// 2. Falls back to a `GetPostBundleV1` RPC against the closest peers in the
53///    [`PeerTracker`], attributing PoW to the locally-held `sponsor_id`.
54/// 3. Initiates post-bundle healing when the bundle has gaps — see
55///    [`crate::client::post_bundle::post_bundle_healing`].
56///
57/// Concurrent requests for the same bundle are de-duplicated via `post_bundle_inflight` so
58/// a burst of timeline scrolls cannot fan out into redundant RPCs. The
59/// [`crate::client::caching::cache_radius_tracker::CacheRadiusTracker`] helps the caching
60/// role of a server decide how widely to mirror bundles.
61pub struct LivePostBundleManager {
62    runtime_services: Arc<RuntimeServices>,
63    client_storage: Arc<dyn ClientStorage>,
64    peer_tracker: Arc<RwLock<PeerTracker>>,
65    sponsor_id: Id,
66    post_bundle_inflight: Mutex<HashMap<Id, Arc<Mutex<()>>>>,
67    post_bundle_cache_radius_tracker: CacheRadiusTracker,
68}
69
70impl LivePostBundleManager {
71    pub fn new(
72        runtime_services: Arc<RuntimeServices>,
73        sponsor_id: Id,
74        client_storage: Arc<dyn ClientStorage>,
75        peer_tracker: Arc<RwLock<PeerTracker>>,
76    ) -> Self {
77        Self {
78            runtime_services,
79            client_storage,
80            peer_tracker,
81            sponsor_id,
82            post_bundle_inflight: Mutex::new(HashMap::new()),
83            post_bundle_cache_radius_tracker: CacheRadiusTracker::new(CLIENT_POST_BUNDLE_CACHE_DURATION.const_mul(5)), // We let these cache radius caches last longer than our own local cache of the bundles...
84        }
85    }
86}
87
88#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
89#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
90impl PostBundleManager
91    for LivePostBundleManager
92{
93    async fn get_post_bundle(&self, bucket_location: &BucketLocation, time_millis: TimeMillis) -> anyhow::Result<EncodedPostBundleV1> {
94        let get_from_cache = async |location_id: Id, time_millis: TimeMillis, reason: &str| -> Option<EncodedPostBundleV1> {
95            let result: anyhow::Result<Option<EncodedPostBundleV1>> = try {
96                let raw = self.client_storage.get(BUCKET_POST_BUNDLE, &location_id.to_hex_str(), time_millis).await?;
97                if let Some(raw) = raw {
98                    let bundle = EncodedPostBundleV1::from_bytes(Bytes::from(raw), true)?;
99                    if bundle.header.sealed {
100                        trace!("Using cached sealed PostBundle for {} at {}", location_id, reason);
101                        Some(bundle)
102                    }
103                    else {
104                        let duration = time_millis - bundle.header.time_millis;
105                        if duration < CLIENT_POST_BUNDLE_CACHE_DURATION {
106                            trace!("Using cached PostBundle for {} (age {}) at {}", location_id, duration, reason);
107                            Some(bundle)
108                        }
109                        else {
110                            trace!("Cached PostBundle for {} expired (age {}) at {}", location_id, duration, reason);
111                            None
112                        }
113                    }
114                }
115                else {
116                    None
117                }
118            };
119
120            result.unwrap_or_else(|e| {
121                warn!("discarding problematic cached PostBundle: {}", e);
122                None
123            })
124        };
125
126        // Check the cache
127        if let Some(cached) = get_from_cache(bucket_location.location_id, time_millis, "preflight").await {
128            return Ok(cached);
129        }
130
131        // Inflight deduplication: get or create a per-key mutex
132        let key_lock = {
133            let mut inflight = self.post_bundle_inflight.lock().await;
134            inflight.entry(bucket_location.location_id).or_insert_with(|| Arc::new(Mutex::new(()))).clone()
135        };
136        let _inflight_guard = key_lock.lock().await;
137        defer!(if let Ok(mut m) = self.post_bundle_inflight.try_lock() {
138            m.remove(&bucket_location.location_id);
139        });
140
141        // Re-check the cache - a concurrent caller may have just finished fetching and stored the result.
142        if let Some(cached) = get_from_cache(bucket_location.location_id, time_millis, "postflight").await {
143            return Ok(cached);
144        }
145
146        let cache_radius = self.post_bundle_cache_radius_tracker.get(bucket_location.location_id, time_millis);
147
148        // We need to refresh...
149        let mut peers_visited = Vec::new();
150        let mut already_retrieved_peer_ids: Vec<Id> = Vec::new();
151        let mut encoded_post_bundles = Vec::new();
152        let mut cache_request_tokens: Vec<CacheRequestTokenV1> = Vec::new();
153        let mut positive_responder_leading_agreement_bits: Vec<LeadingAgreementBits> = Vec::new();
154        let mut bundle_bytes_for_upload: Vec<(Id, Bytes)> = Vec::new();
155        {
156            let mut peer_tracker = self.peer_tracker.write().await;
157            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, cache_radius).await?;
158            while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
159                if already_retrieved_peer_ids.contains(&peer.id) {
160                    continue;
161                }
162
163                let result: anyhow::Result<()> = try {
164                    info!("Requesting PostBundle bucket_location={} with leading_agreement_bits={} from peer {}", bucket_location, leading_agreement_bits, peer);
165
166                    let request = GetPostBundleV1 {
167                        bucket_location: bucket_location.clone(),
168                        peers_visited: peers_visited.clone(),
169                        already_retrieved_peer_ids: already_retrieved_peer_ids.clone(),
170                    };
171                    let request = json::struct_to_bytes(&request)?;
172                    let response = rpc::rpc::rpc_server_known(&self.runtime_services, &self.sponsor_id, &peer, PayloadRequestKind::GetPostBundleV1, request).await?;
173                    anyhow_assert_eq!(&PayloadResponseKind::GetPostBundleResponseV1, &response.response_request_kind);
174                    let response = GetPostBundleResponseV1::from_bytes(response.bytes)?;
175
176                    peers_visited.push(peer.clone());
177                    peer_iter.add_peers(response.peers_nearer);
178
179                    if let Some(token) = response.cache_request_token {
180                        cache_request_tokens.push(token);
181                    }
182
183                    let mut found_bundle_from_this_peer = false;
184
185                    for cached_bytes in response.post_bundles_cached {
186                        let process_result: anyhow::Result<()> = try {
187                            let post_bundle = EncodedPostBundleV1::from_bytes(cached_bytes.clone(), true)?;
188                            post_bundle.header.verify()?;
189                            trace!("Retrieved cached PostBundle from peer.id={} with {} posts", post_bundle.header.peer.id, post_bundle.header.num_posts);
190
191                            already_retrieved_peer_ids.push(post_bundle.header.peer.id);
192                            bundle_bytes_for_upload.push((post_bundle.header.peer.id, cached_bytes));
193                            encoded_post_bundles.push(post_bundle);
194                            found_bundle_from_this_peer = true;
195                        };
196                        if let Err(e) = process_result {
197                            warn!("Error processing cached PostBundle: {}", e);
198                        }
199                    }
200
201                    if let Some(post_bundle_raw) = response.post_bundle {
202                        let post_bundle = EncodedPostBundleV1::from_bytes(post_bundle_raw.clone(), true)?;
203                        post_bundle.header.verify()?;
204                        trace!("Retrieved PostBundle from peer.id={} with {} posts", post_bundle.header.peer.id, post_bundle.header.num_posts);
205
206                        already_retrieved_peer_ids.push(post_bundle.header.peer.id);
207                        bundle_bytes_for_upload.push((post_bundle.header.peer.id, post_bundle_raw));
208                        encoded_post_bundles.push(post_bundle);
209                        found_bundle_from_this_peer = true;
210                    }
211
212                    if found_bundle_from_this_peer {
213                        positive_responder_leading_agreement_bits.push(leading_agreement_bits);
214                    }
215
216                    if encoded_post_bundles.len() >= config::REDUNDANT_SERVERS_PER_POST {
217                        break;
218                    }
219                };
220
221                if let Err(e) = result {
222                    warn!("Error retrieving PostBundle from peer {}: {}", peer, e);
223                    peer_iter.remove_peer(&peer);
224                }
225            }
226
227            let result = peer_tracker.flush().await;
228            if let Err(e) = result {
229                warn!("Error flushing peer tracker: {}", e);
230            }
231        }
232
233        info!("Discovered {} post bundles after visiting {} peers", encoded_post_bundles.len(), peers_visited.len());
234
235        // Store our new cache radius - the peer "furthest out" from our bundle's location_id
236        if let Some(new_radius) = positive_responder_leading_agreement_bits.iter().copied().min() {
237            self.post_bundle_cache_radius_tracker.update(bucket_location.location_id, new_radius, time_millis);
238        }
239
240        // Select the best bundle (most posts) from everything collected, including cached copies.
241        let best_encoded_post_bundle = {
242            encoded_post_bundles
243                .iter()
244                .max_by_key(|b| b.header.num_posts)
245                .cloned()
246                .ok_or_else(|| anyhow::anyhow!("No post bundles discovered for {}", bucket_location.location_id))?
247        };
248
249        // Cache it
250        trace!("Caching PostBundle for {} with {} posts", bucket_location.location_id, best_encoded_post_bundle.header.num_posts);
251        let encoded_post_bundle_bytes = best_encoded_post_bundle.to_bytes()?;
252        self.client_storage.put(BUCKET_POST_BUNDLE, &bucket_location.location_id.to_hex_str(), encoded_post_bundle_bytes.to_vec(), time_millis).await?;
253
254        // Kick off healing
255        post_bundle_healing::heal_post_bundles(self.runtime_services.clone(), self.sponsor_id, bucket_location.clone(), &peers_visited, encoded_post_bundles);
256
257        // Kick off remote cache populating
258        post_bundle_cache_uploader::upload_post_bundle_caches(self.runtime_services.clone(), self.sponsor_id, cache_request_tokens, bundle_bytes_for_upload);
259
260        Ok(best_encoded_post_bundle)
261    }
262}