Skip to main content

hashiverse_server_lib/server/
post_bundle_feedback_caching.rs

1//! # Post-bundle feedback read cache
2//!
3//! Sister of [`crate::server::post_bundle_caching`] for the feedback layer.
4//! One key difference: feedback is pre-merged by the client before it arrives at
5//! the server, so the cache stores a single canonical entry per `location_id`
6//! instead of one per originator. This also means the cached value is typically
7//! much smaller (~8 KB placeholder weight vs. the post-bundle cache's 4 MB), so
8//! more entries fit into the same memory budget.
9//!
10//! Cache-request token issuance and the hit-count heuristic work identically to
11//! the post-bundle cache, so hot feedback gets gossiped out in the same way hot
12//! bundles do.
13
14use bytes::Bytes;
15use hashiverse_lib::protocol::payload::payload::CacheRequestTokenV1;
16use hashiverse_lib::protocol::peer::Peer;
17use hashiverse_lib::tools::buckets::BucketLocation;
18use hashiverse_lib::tools::server_id::ServerId;
19use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE};
20use hashiverse_lib::tools::types::Id;
21use moka::sync::Cache;
22use std::sync::{Arc, Mutex};
23
24use crate::server::post_bundle_caching_shared::{CachedBundle, GetCacheResult, CACHE_HIT_THRESHOLD, CACHE_LOCATION_TTI, CACHE_REQUEST_TOKEN_TTL_DURATION, CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS};
25
26/// Placeholder weight for feedback entries — approximates a merged feedback bundle.
27/// Ballpark: 20 posts × 10 feedbacks × 50 bytes each ≈ 10 KB.
28const POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT: u32 = 8 * 1024;
29
30// --------------------------------------------------------------------------------------------
31// CachedPostBundleFeedbackLocationEntry
32// --------------------------------------------------------------------------------------------
33
34/// Per-location entry for feedback.
35/// Only one canonical merged result is stored — the client already merges feedback across
36/// servers before uploading, so multiple originator versions are not meaningful here.
37struct CachedPostBundleFeedbackLocationEntry {
38    /// The single merged feedback bundle, together with the originator's peer_id.
39    bundle: Option<(Id, CachedBundle)>,
40    hit_count: u32,
41}
42
43impl CachedPostBundleFeedbackLocationEntry {
44    fn placeholder() -> Self {
45        Self { bundle: None, hit_count: 0 }
46    }
47
48    fn weight(&self) -> u32 {
49        self.bundle.as_ref().map(|(_, b)| b.bytes.len() as u32).unwrap_or(POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT)
50    }
51}
52
53// --------------------------------------------------------------------------------------------
54// PostBundleFeedbackCache
55// --------------------------------------------------------------------------------------------
56
57/// Intermediate-server cache for `EncodedPostBundleFeedbackV1` data.
58///
59/// Two Moka caches:
60/// - `bundles`: weighted `Cache<Id, Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>>` with TTI.
61///   If a location_id hasn't been queried within `CACHE_LOCATION_TTI`, the entry is evicted.
62/// - `inflight`: `Cache<Id, ()>` with 30-second TTL — tracks pending upload tokens.
63pub struct PostBundleFeedbackCache {
64    bundles: Cache<Id, Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>>,
65    inflight: Cache<Id, ()>,
66}
67
68impl PostBundleFeedbackCache {
69    pub fn new(max_bytes: u64) -> Self {
70        let bundles = Cache::builder()
71            .weigher(|_key: &Id, entry: &Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>| {
72                entry.lock().map(|e| e.weight()).unwrap_or(POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT)
73            })
74            .max_capacity(max_bytes)
75            .time_to_idle(CACHE_LOCATION_TTI)
76            .build();
77
78        let inflight = Cache::builder()
79            .time_to_live(CACHE_REQUEST_TOKEN_TTL_DURATION)
80            .build();
81
82        Self { bundles, inflight }
83    }
84
85    /// Called by the dispatch handler when serving a `GetPostBundleFeedbackV1` request.
86    pub fn on_get(
87        &self,
88        bucket_location: &BucketLocation,
89        already_retrieved_peer_ids: &[Id],
90        peer_self: &Peer,
91        server_id: &ServerId,
92        now: TimeMillis,
93    ) -> GetCacheResult {
94        let location_id = bucket_location.location_id;
95        let entry_arc = self.bundles.get_with(location_id, || Arc::new(Mutex::new(CachedPostBundleFeedbackLocationEntry::placeholder())));
96
97        let (cached_items, already_cached_peer_ids, should_issue_token) = {
98            let mut entry = entry_arc.lock().unwrap();
99            entry.hit_count += 1;
100
101            let cached_items: Vec<Bytes> = entry.bundle
102                .iter()
103                .filter(|(originator_id, bundle)| !already_retrieved_peer_ids.contains(originator_id) && !bundle.is_stale(now))
104                .map(|(_, bundle)| bundle.bytes.clone())
105                .collect();
106
107            let already_cached_peer_ids: Vec<Id> = entry.bundle.iter().map(|(id, _)| *id).collect();
108            let should_issue_token = entry.hit_count >= CACHE_HIT_THRESHOLD && !self.inflight.contains_key(&location_id);
109            (cached_items, already_cached_peer_ids, should_issue_token)
110        };
111
112        let cache_request_token = if should_issue_token {
113            self.inflight.insert(location_id, ());
114            let expires_at = now + CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS;
115            Some(CacheRequestTokenV1::new(peer_self.clone(), bucket_location.clone(), expires_at, already_cached_peer_ids, &server_id.keys.signature_key))
116        } else {
117            None
118        };
119
120        GetCacheResult { cached_items, cache_request_token }
121    }
122
123    /// Called by the dispatch handler when a `CachePostBundleFeedbackV1` upload arrives.
124    /// Unconditionally replaces any previously cached feedback — the uploader has the merged best.
125    /// Returns `true` if accepted, `false` if the entry was evicted before the upload arrived.
126    pub fn on_upload(
127        &self,
128        location_id: Id,
129        originator_peer_id: Id,
130        feedback_bytes: Bytes,
131        server_time: TimeMillis,
132        is_sealed: bool,
133    ) -> bool {
134        let entry_arc = match self.bundles.get(&location_id) {
135            Some(e) => e,
136            None => return false,
137        };
138
139        let mut entry = entry_arc.lock().unwrap();
140        let expires_at = if is_sealed { None } else { Some(server_time + MILLIS_IN_MINUTE.const_mul(5)) };
141        entry.bundle = Some((originator_peer_id, CachedBundle { bytes: feedback_bytes, expires_at }));
142        true
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use bytes::Bytes;
150    use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
151    use hashiverse_lib::tools::server_id::ServerId;
152    use hashiverse_lib::tools::time::TimeMillis;
153    use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
154    use hashiverse_lib::tools::parallel_pow_generator::StubParallelPowGenerator;
155    use hashiverse_lib::tools::types::{Id, Pow};
156
157    async fn make_test_server_and_peer() -> anyhow::Result<(ServerId, Peer)> {
158        let time_provider = RealTimeProvider::default();
159        let pow_generator = StubParallelPowGenerator::new();
160        let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
161        let peer = server_id.to_peer(&time_provider)?;
162        Ok((server_id, peer))
163    }
164
165    fn make_test_bucket_location() -> BucketLocation {
166        BucketLocation::new(BucketType::User, Id::random(), BUCKET_DURATIONS[0], TimeMillis(1_000_000)).unwrap()
167    }
168
169    #[tokio::test]
170    async fn test_below_threshold_no_token() -> anyhow::Result<()> {
171        let (server_id, peer_self) = make_test_server_and_peer().await?;
172        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
173        let bucket_location = make_test_bucket_location();
174        let now = TimeMillis(1_000_000);
175
176        for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
177            let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
178            assert!(result.cache_request_token.is_none());
179            assert!(result.cached_items.is_empty());
180        }
181
182        Ok(())
183    }
184
185    #[tokio::test]
186    async fn test_at_threshold_token_issued_then_deduplicated() -> anyhow::Result<()> {
187        let (server_id, peer_self) = make_test_server_and_peer().await?;
188        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
189        let bucket_location = make_test_bucket_location();
190        let now = TimeMillis(1_000_000);
191
192        for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
193            let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
194            assert!(result.cache_request_token.is_none());
195        }
196
197        let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
198        assert!(result.cache_request_token.is_some());
199
200        let result2 = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
201        assert!(result2.cache_request_token.is_none());
202
203        Ok(())
204    }
205
206    #[tokio::test]
207    async fn test_upload_and_retrieval() -> anyhow::Result<()> {
208        let (server_id, peer_self) = make_test_server_and_peer().await?;
209        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
210        let bucket_location = make_test_bucket_location();
211        let location_id = bucket_location.location_id;
212        let now = TimeMillis(1_000_000);
213        let originator_id = Id::random();
214        let feedback_bytes = Bytes::from_static(b"test_feedback");
215
216        cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
217
218        let accepted = cache.on_upload(location_id, originator_id, feedback_bytes.clone(), now, false);
219        assert!(accepted);
220
221        let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
222        assert_eq!(result.cached_items, vec![feedback_bytes]);
223
224        Ok(())
225    }
226
227    #[tokio::test]
228    async fn test_already_retrieved_filtered() -> anyhow::Result<()> {
229        let (server_id, peer_self) = make_test_server_and_peer().await?;
230        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
231        let bucket_location = make_test_bucket_location();
232        let location_id = bucket_location.location_id;
233        let now = TimeMillis(1_000_000);
234        let originator_id = Id::random();
235
236        cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
237        cache.on_upload(location_id, originator_id, Bytes::from_static(b"feedback"), now, false);
238
239        let result = cache.on_get(&bucket_location, &[originator_id], &peer_self, &server_id, now);
240        assert!(result.cached_items.is_empty());
241
242        Ok(())
243    }
244
245    #[tokio::test]
246    async fn test_upload_returns_false_when_not_in_cache() -> anyhow::Result<()> {
247        let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
248        let location_id = Id::random();
249        let originator_id = Id::random();
250
251        let accepted = cache.on_upload(location_id, originator_id, Bytes::from_static(b"feedback"), TimeMillis(1_000_000), false);
252        assert!(!accepted);
253
254        Ok(())
255    }
256}