hashiverse_server_lib/server/
post_bundle_feedback_caching.rs1use bytes::Bytes;
15use hashiverse_lib::protocol::payload::payload::CacheRequestTokenV1;
16use hashiverse_lib::protocol::peer::Peer;
17use hashiverse_lib::tools::buckets::BucketLocation;
18use hashiverse_lib::tools::server_id::ServerId;
19use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE};
20use hashiverse_lib::tools::types::Id;
21use moka::sync::Cache;
22use std::sync::{Arc, Mutex};
23
24use crate::server::post_bundle_caching_shared::{CachedBundle, GetCacheResult, CACHE_HIT_THRESHOLD, CACHE_LOCATION_TTI, CACHE_REQUEST_TOKEN_TTL_DURATION, CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS};
25
26const POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT: u32 = 8 * 1024;
29
30struct CachedPostBundleFeedbackLocationEntry {
38 bundle: Option<(Id, CachedBundle)>,
40 hit_count: u32,
41}
42
43impl CachedPostBundleFeedbackLocationEntry {
44 fn placeholder() -> Self {
45 Self { bundle: None, hit_count: 0 }
46 }
47
48 fn weight(&self) -> u32 {
49 self.bundle.as_ref().map(|(_, b)| b.bytes.len() as u32).unwrap_or(POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT)
50 }
51}
52
53pub struct PostBundleFeedbackCache {
64 bundles: Cache<Id, Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>>,
65 inflight: Cache<Id, ()>,
66}
67
68impl PostBundleFeedbackCache {
69 pub fn new(max_bytes: u64) -> Self {
70 let bundles = Cache::builder()
71 .weigher(|_key: &Id, entry: &Arc<Mutex<CachedPostBundleFeedbackLocationEntry>>| {
72 entry.lock().map(|e| e.weight()).unwrap_or(POST_BUNDLE_FEEDBACK_PLACEHOLDER_WEIGHT)
73 })
74 .max_capacity(max_bytes)
75 .time_to_idle(CACHE_LOCATION_TTI)
76 .build();
77
78 let inflight = Cache::builder()
79 .time_to_live(CACHE_REQUEST_TOKEN_TTL_DURATION)
80 .build();
81
82 Self { bundles, inflight }
83 }
84
85 pub fn on_get(
87 &self,
88 bucket_location: &BucketLocation,
89 already_retrieved_peer_ids: &[Id],
90 peer_self: &Peer,
91 server_id: &ServerId,
92 now: TimeMillis,
93 ) -> GetCacheResult {
94 let location_id = bucket_location.location_id;
95 let entry_arc = self.bundles.get_with(location_id, || Arc::new(Mutex::new(CachedPostBundleFeedbackLocationEntry::placeholder())));
96
97 let (cached_items, already_cached_peer_ids, should_issue_token) = {
98 let mut entry = entry_arc.lock().unwrap();
99 entry.hit_count += 1;
100
101 let cached_items: Vec<Bytes> = entry.bundle
102 .iter()
103 .filter(|(originator_id, bundle)| !already_retrieved_peer_ids.contains(originator_id) && !bundle.is_stale(now))
104 .map(|(_, bundle)| bundle.bytes.clone())
105 .collect();
106
107 let already_cached_peer_ids: Vec<Id> = entry.bundle.iter().map(|(id, _)| *id).collect();
108 let should_issue_token = entry.hit_count >= CACHE_HIT_THRESHOLD && !self.inflight.contains_key(&location_id);
109 (cached_items, already_cached_peer_ids, should_issue_token)
110 };
111
112 let cache_request_token = if should_issue_token {
113 self.inflight.insert(location_id, ());
114 let expires_at = now + CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS;
115 Some(CacheRequestTokenV1::new(peer_self.clone(), bucket_location.clone(), expires_at, already_cached_peer_ids, &server_id.keys.signature_key))
116 } else {
117 None
118 };
119
120 GetCacheResult { cached_items, cache_request_token }
121 }
122
123 pub fn on_upload(
127 &self,
128 location_id: Id,
129 originator_peer_id: Id,
130 feedback_bytes: Bytes,
131 server_time: TimeMillis,
132 is_sealed: bool,
133 ) -> bool {
134 let entry_arc = match self.bundles.get(&location_id) {
135 Some(e) => e,
136 None => return false,
137 };
138
139 let mut entry = entry_arc.lock().unwrap();
140 let expires_at = if is_sealed { None } else { Some(server_time + MILLIS_IN_MINUTE.const_mul(5)) };
141 entry.bundle = Some((originator_peer_id, CachedBundle { bytes: feedback_bytes, expires_at }));
142 true
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use bytes::Bytes;
150 use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
151 use hashiverse_lib::tools::server_id::ServerId;
152 use hashiverse_lib::tools::time::TimeMillis;
153 use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
154 use hashiverse_lib::tools::parallel_pow_generator::StubParallelPowGenerator;
155 use hashiverse_lib::tools::types::{Id, Pow};
156
157 async fn make_test_server_and_peer() -> anyhow::Result<(ServerId, Peer)> {
158 let time_provider = RealTimeProvider::default();
159 let pow_generator = StubParallelPowGenerator::new();
160 let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
161 let peer = server_id.to_peer(&time_provider)?;
162 Ok((server_id, peer))
163 }
164
165 fn make_test_bucket_location() -> BucketLocation {
166 BucketLocation::new(BucketType::User, Id::random(), BUCKET_DURATIONS[0], TimeMillis(1_000_000)).unwrap()
167 }
168
169 #[tokio::test]
170 async fn test_below_threshold_no_token() -> anyhow::Result<()> {
171 let (server_id, peer_self) = make_test_server_and_peer().await?;
172 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
173 let bucket_location = make_test_bucket_location();
174 let now = TimeMillis(1_000_000);
175
176 for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
177 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
178 assert!(result.cache_request_token.is_none());
179 assert!(result.cached_items.is_empty());
180 }
181
182 Ok(())
183 }
184
185 #[tokio::test]
186 async fn test_at_threshold_token_issued_then_deduplicated() -> anyhow::Result<()> {
187 let (server_id, peer_self) = make_test_server_and_peer().await?;
188 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
189 let bucket_location = make_test_bucket_location();
190 let now = TimeMillis(1_000_000);
191
192 for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
193 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
194 assert!(result.cache_request_token.is_none());
195 }
196
197 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
198 assert!(result.cache_request_token.is_some());
199
200 let result2 = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
201 assert!(result2.cache_request_token.is_none());
202
203 Ok(())
204 }
205
206 #[tokio::test]
207 async fn test_upload_and_retrieval() -> anyhow::Result<()> {
208 let (server_id, peer_self) = make_test_server_and_peer().await?;
209 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
210 let bucket_location = make_test_bucket_location();
211 let location_id = bucket_location.location_id;
212 let now = TimeMillis(1_000_000);
213 let originator_id = Id::random();
214 let feedback_bytes = Bytes::from_static(b"test_feedback");
215
216 cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
217
218 let accepted = cache.on_upload(location_id, originator_id, feedback_bytes.clone(), now, false);
219 assert!(accepted);
220
221 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
222 assert_eq!(result.cached_items, vec![feedback_bytes]);
223
224 Ok(())
225 }
226
227 #[tokio::test]
228 async fn test_already_retrieved_filtered() -> anyhow::Result<()> {
229 let (server_id, peer_self) = make_test_server_and_peer().await?;
230 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
231 let bucket_location = make_test_bucket_location();
232 let location_id = bucket_location.location_id;
233 let now = TimeMillis(1_000_000);
234 let originator_id = Id::random();
235
236 cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
237 cache.on_upload(location_id, originator_id, Bytes::from_static(b"feedback"), now, false);
238
239 let result = cache.on_get(&bucket_location, &[originator_id], &peer_self, &server_id, now);
240 assert!(result.cached_items.is_empty());
241
242 Ok(())
243 }
244
245 #[tokio::test]
246 async fn test_upload_returns_false_when_not_in_cache() -> anyhow::Result<()> {
247 let cache = PostBundleFeedbackCache::new(16 * 1024 * 1024);
248 let location_id = Id::random();
249 let originator_id = Id::random();
250
251 let accepted = cache.on_upload(location_id, originator_id, Bytes::from_static(b"feedback"), TimeMillis(1_000_000), false);
252 assert!(!accepted);
253
254 Ok(())
255 }
256}