Skip to main content

hashiverse_server_lib/server/
post_bundle_caching.rs

1//! # Post-bundle read cache
2//!
3//! A `moka` weighted cache that holds recently-served post bundles in RAM so repeated
4//! reads for the same `(location_id, originator)` don't thrash disk. Multiple versions
5//! per `location_id` are kept because different peer originators hold different slices
6//! of the same bucket — the cache index is keyed by `(location_id, originator_id)`.
7//!
8//! The cache doubles as a **propagation hint engine**: once a hot entry has been
9//! requested more than [`crate::server::post_bundle_caching_shared::CACHE_HIT_THRESHOLD`]
10//! times (10), the next requester's response includes a signed
11//! [`hashiverse_lib::protocol::payload::payload::CacheRequestTokenV1`] asking them to
12//! forward the bundle to additional servers. This turns read pressure into better
13//! cache distribution without requiring explicit coordination.
14//!
15//! An `inflight` sibling cache tracks outstanding tokens so the same hot bundle doesn't
16//! generate redundant cache-request tokens in parallel.
17
18use bytes::Bytes;
19use hashiverse_lib::protocol::payload::payload::CacheRequestTokenV1;
20use hashiverse_lib::protocol::peer::Peer;
21use hashiverse_lib::tools::buckets::BucketLocation;
22use hashiverse_lib::tools::server_id::ServerId;
23use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE};
24use hashiverse_lib::tools::tools::leading_agreement_bits_xor;
25use hashiverse_lib::tools::types::Id;
26use moka::sync::Cache;
27use std::collections::HashMap;
28use std::sync::{Arc, Mutex};
29
30use crate::server::post_bundle_caching_shared::{CachedBundle, GetCacheResult, CACHE_HIT_THRESHOLD, CACHE_LOCATION_TTI, CACHE_REQUEST_TOKEN_TTL_DURATION, CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS};
31
32/// Placeholder weight for post bundle entries — approximates the real data that will replace it.
33/// Ballpark: 20 posts × 50 KB each × up to 5 originators ≈ 5 MB per location.
34const POST_BUNDLE_PLACEHOLDER_WEIGHT: u32 = 4 * 1024 * 1024;
35
36// --------------------------------------------------------------------------------------------
37// CachedPostBundleLocationEntry
38// --------------------------------------------------------------------------------------------
39
40/// Per-location entry for post bundles.
41/// Multiple originator versions are stored — different servers may hold different subsets of posts.
42struct CachedPostBundleLocationEntry {
43    /// Originator peer_id → cached bundle.
44    bundles: HashMap<Id, CachedBundle>,
45    hit_count: u32,
46}
47
48impl CachedPostBundleLocationEntry {
49    fn placeholder() -> Self {
50        Self { bundles: HashMap::new(), hit_count: 0 }
51    }
52
53    fn weight(&self) -> u32 {
54        let total: u32 = self.bundles.values().map(|b| b.bytes.len() as u32).sum();
55        if total == 0 { POST_BUNDLE_PLACEHOLDER_WEIGHT } else { total }
56    }
57}
58
59// --------------------------------------------------------------------------------------------
60// PostBundleCache
61// --------------------------------------------------------------------------------------------
62
63/// Intermediate-server cache for `EncodedPostBundleV1` data.
64///
65/// Two Moka caches:
66/// - `bundles`: weighted `Cache<Id, Arc<Mutex<CachedPostBundleLocationEntry>>>` with TTI.
67///   If a location_id hasn't been queried within `CACHE_LOCATION_TTI`, the entire entry is evicted.
68///   Individual bundles within an entry may also be stale (live bundles have a per-bundle
69///   `expires_at`; sealed bundles are never individually stale).
70/// - `inflight`: `Cache<Id, ()>` with 30-second TTL — tracks locations for which a
71///   `CacheRequestToken` has been issued but the client hasn't uploaded yet.
72pub struct PostBundleCache {
73    max_originators_per_location: usize,
74    bundles: Cache<Id, Arc<Mutex<CachedPostBundleLocationEntry>>>,
75    inflight: Cache<Id, ()>,
76}
77
78impl PostBundleCache {
79    pub fn new(max_originators_per_location: usize, max_bytes: u64) -> Self {
80        let bundles = Cache::builder()
81            .weigher(|_key: &Id, entry: &Arc<Mutex<CachedPostBundleLocationEntry>>| {
82                entry.lock().map(|e| e.weight()).unwrap_or(POST_BUNDLE_PLACEHOLDER_WEIGHT)
83            })
84            .max_capacity(max_bytes)
85            .time_to_idle(CACHE_LOCATION_TTI)
86            .build();
87
88        let inflight = Cache::builder()
89            .time_to_live(CACHE_REQUEST_TOKEN_TTL_DURATION)
90            .build();
91
92        Self { max_originators_per_location, bundles, inflight }
93    }
94
95    /// Called by the dispatch handler when serving a `GetPostBundleV1` request.
96    ///
97    /// - `bucket_location` — used as the cache key (`location_id`) and included in any token issued.
98    /// - `already_retrieved_peer_ids` — originator IDs the client already has; filtered out of `cached_items`.
99    /// - `peer_self` / `server_id` — used to sign the `CacheRequestToken` if one is issued.
100    /// - `now` — current time.
101    pub fn on_get(
102        &self,
103        bucket_location: &BucketLocation,
104        already_retrieved_peer_ids: &[Id],
105        peer_self: &Peer,
106        server_id: &ServerId,
107        now: TimeMillis,
108    ) -> GetCacheResult {
109        let location_id = bucket_location.location_id;
110        let entry_arc = self.bundles.get_with(location_id, || Arc::new(Mutex::new(CachedPostBundleLocationEntry::placeholder())));
111
112        let (cached_items, already_cached_peer_ids, should_issue_token) = {
113            let mut entry = entry_arc.lock().unwrap();
114            entry.hit_count += 1;
115
116            let already_retrieved_set: std::collections::HashSet<Id> = already_retrieved_peer_ids.iter().copied().collect();
117            let cached_items: Vec<Bytes> = entry.bundles
118                .iter()
119                .filter(|(originator_id, bundle)| !already_retrieved_set.contains(originator_id) && !bundle.is_stale(now))
120                .map(|(_, bundle)| bundle.bytes.clone())
121                .collect();
122
123            let already_cached_peer_ids: Vec<Id> = entry.bundles.keys().copied().collect();
124            let should_issue_token = entry.hit_count >= CACHE_HIT_THRESHOLD && !self.inflight.contains_key(&location_id);
125            (cached_items, already_cached_peer_ids, should_issue_token)
126        };
127
128        let cache_request_token = if should_issue_token {
129            self.inflight.insert(location_id, ());
130            let expires_at = now + CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS;
131            Some(CacheRequestTokenV1::new(peer_self.clone(), bucket_location.clone(), expires_at, already_cached_peer_ids, &server_id.keys.signature_key))
132        } else {
133            None
134        };
135
136        GetCacheResult { cached_items, cache_request_token }
137    }
138
139    /// Called by the dispatch handler when a `CachePostBundleV1` upload arrives.
140    /// The token must have been verified and expiry-checked by the caller.
141    /// Returns `true` if accepted, `false` if the entry was evicted before the upload arrived.
142    pub fn on_upload(
143        &self,
144        location_id: Id,
145        originator_peer_id: Id,
146        bundle_bytes: Bytes,
147        server_time: TimeMillis,
148        is_sealed: bool,
149    ) -> bool {
150        let entry_arc = match self.bundles.get(&location_id) {
151            Some(e) => e,
152            None => return false,   // Entry evicted between token issuance and upload — reject.
153        };
154
155        let mut entry = entry_arc.lock().unwrap();
156        let expires_at = if is_sealed { None } else { Some(server_time + MILLIS_IN_MINUTE.const_mul(5)) };
157        let bundle = CachedBundle { bytes: bundle_bytes, expires_at };
158
159        // Insert (or update) the new originator
160        entry.bundles.insert(originator_peer_id, bundle);
161
162        // If over capacity, evict the worst entry: furthest from location_id,
163        // breaking ties by expires_at (stalest loses). This may evict the entry
164        // we just inserted if it's the worst — that's correct.
165        // This will also prevent sybils trying to insert garbage cache items - they would have to control the location_id
166        while entry.bundles.len() > self.max_originators_per_location {
167            let evict_key = entry.bundles
168                .iter()
169                .min_by(|(id_a, bundle_a), (id_b, bundle_b)| {
170                    let distance_a = leading_agreement_bits_xor(id_a.as_ref(), location_id.as_ref());
171                    let distance_b = leading_agreement_bits_xor(id_b.as_ref(), location_id.as_ref());
172                    distance_a.cmp(&distance_b).then_with(|| {
173                        let expires_a = bundle_a.expires_at.unwrap_or(TimeMillis(i64::MAX));
174                        let expires_b = bundle_b.expires_at.unwrap_or(TimeMillis(i64::MAX));
175                        expires_a.cmp(&expires_b)
176                    })
177                })
178                .map(|(id, _)| *id);
179            if let Some(k) = evict_key {
180                entry.bundles.remove(&k);
181            }
182        }
183
184        // Return whether our insertion survived eviction
185        entry.bundles.contains_key(&originator_peer_id)
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use bytes::Bytes;
193    use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
194    use hashiverse_lib::tools::server_id::ServerId;
195    use hashiverse_lib::tools::time::TimeMillis;
196    use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
197    use hashiverse_lib::tools::parallel_pow_generator::StubParallelPowGenerator;
198    use hashiverse_lib::tools::types::{Id, Pow};
199
200    async fn make_test_server_and_peer() -> anyhow::Result<(ServerId, hashiverse_lib::protocol::peer::Peer)> {
201        let time_provider = RealTimeProvider::default();
202        let pow_generator = StubParallelPowGenerator::new();
203        let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
204        let peer = server_id.to_peer(&time_provider)?;
205        Ok((server_id, peer))
206    }
207
208    fn make_test_bucket_location() -> BucketLocation {
209        BucketLocation::new(BucketType::User, Id::random(), BUCKET_DURATIONS[0], TimeMillis(1_000_000)).unwrap()
210    }
211
212    #[tokio::test]
213    async fn test_below_threshold_no_token() -> anyhow::Result<()> {
214        let (server_id, peer_self) = make_test_server_and_peer().await?;
215        let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
216        let bucket_location = make_test_bucket_location();
217        let now = TimeMillis(1_000_000);
218
219        for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
220            let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
221            assert!(result.cache_request_token.is_none());
222            assert!(result.cached_items.is_empty());
223        }
224
225        Ok(())
226    }
227
228    #[tokio::test]
229    async fn test_at_threshold_token_issued_then_deduplicated() -> anyhow::Result<()> {
230        let (server_id, peer_self) = make_test_server_and_peer().await?;
231        let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
232        let bucket_location = make_test_bucket_location();
233        let now = TimeMillis(1_000_000);
234
235        for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
236            let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
237            assert!(result.cache_request_token.is_none());
238        }
239
240        // The threshold-th call issues a token
241        let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
242        assert!(result.cache_request_token.is_some());
243
244        // Subsequent calls must NOT double-issue (inflight dedupe)
245        let result2 = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
246        assert!(result2.cache_request_token.is_none());
247
248        Ok(())
249    }
250
251    #[tokio::test]
252    async fn test_upload_and_retrieval() -> anyhow::Result<()> {
253        let (server_id, peer_self) = make_test_server_and_peer().await?;
254        let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
255        let bucket_location = make_test_bucket_location();
256        let location_id = bucket_location.location_id;
257        let now = TimeMillis(1_000_000);
258        let originator_id = Id::random();
259        let bundle_bytes = Bytes::from_static(b"test_bundle");
260
261        // Register the placeholder entry via on_get
262        cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
263
264        let accepted = cache.on_upload(location_id, originator_id, bundle_bytes.clone(), now, false);
265        assert!(accepted);
266
267        let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
268        assert_eq!(result.cached_items, vec![bundle_bytes]);
269
270        Ok(())
271    }
272
273    #[tokio::test]
274    async fn test_already_retrieved_filtered() -> anyhow::Result<()> {
275        let (server_id, peer_self) = make_test_server_and_peer().await?;
276        let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
277        let bucket_location = make_test_bucket_location();
278        let location_id = bucket_location.location_id;
279        let now = TimeMillis(1_000_000);
280        let originator_id = Id::random();
281
282        cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
283        cache.on_upload(location_id, originator_id, Bytes::from_static(b"bundle"), now, false);
284
285        let result = cache.on_get(&bucket_location, &[originator_id], &peer_self, &server_id, now);
286        assert!(result.cached_items.is_empty());
287
288        Ok(())
289    }
290
291    #[tokio::test]
292    async fn test_upload_returns_false_when_not_in_cache() -> anyhow::Result<()> {
293        let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
294        let location_id = Id::random();
295        let originator_id = Id::random();
296
297        // No on_get call — entry was never inserted — upload must be rejected
298        let accepted = cache.on_upload(location_id, originator_id, Bytes::from_static(b"bundle"), TimeMillis(1_000_000), false);
299        assert!(!accepted);
300
301        Ok(())
302    }
303}