hashiverse_server_lib/server/
post_bundle_caching.rs1use bytes::Bytes;
19use hashiverse_lib::protocol::payload::payload::CacheRequestTokenV1;
20use hashiverse_lib::protocol::peer::Peer;
21use hashiverse_lib::tools::buckets::BucketLocation;
22use hashiverse_lib::tools::server_id::ServerId;
23use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE};
24use hashiverse_lib::tools::tools::leading_agreement_bits_xor;
25use hashiverse_lib::tools::types::Id;
26use moka::sync::Cache;
27use std::collections::HashMap;
28use std::sync::{Arc, Mutex};
29
30use crate::server::post_bundle_caching_shared::{CachedBundle, GetCacheResult, CACHE_HIT_THRESHOLD, CACHE_LOCATION_TTI, CACHE_REQUEST_TOKEN_TTL_DURATION, CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS};
31
32const POST_BUNDLE_PLACEHOLDER_WEIGHT: u32 = 4 * 1024 * 1024;
35
36struct CachedPostBundleLocationEntry {
43 bundles: HashMap<Id, CachedBundle>,
45 hit_count: u32,
46}
47
48impl CachedPostBundleLocationEntry {
49 fn placeholder() -> Self {
50 Self { bundles: HashMap::new(), hit_count: 0 }
51 }
52
53 fn weight(&self) -> u32 {
54 let total: u32 = self.bundles.values().map(|b| b.bytes.len() as u32).sum();
55 if total == 0 { POST_BUNDLE_PLACEHOLDER_WEIGHT } else { total }
56 }
57}
58
59pub struct PostBundleCache {
73 max_originators_per_location: usize,
74 bundles: Cache<Id, Arc<Mutex<CachedPostBundleLocationEntry>>>,
75 inflight: Cache<Id, ()>,
76}
77
78impl PostBundleCache {
79 pub fn new(max_originators_per_location: usize, max_bytes: u64) -> Self {
80 let bundles = Cache::builder()
81 .weigher(|_key: &Id, entry: &Arc<Mutex<CachedPostBundleLocationEntry>>| {
82 entry.lock().map(|e| e.weight()).unwrap_or(POST_BUNDLE_PLACEHOLDER_WEIGHT)
83 })
84 .max_capacity(max_bytes)
85 .time_to_idle(CACHE_LOCATION_TTI)
86 .build();
87
88 let inflight = Cache::builder()
89 .time_to_live(CACHE_REQUEST_TOKEN_TTL_DURATION)
90 .build();
91
92 Self { max_originators_per_location, bundles, inflight }
93 }
94
95 pub fn on_get(
102 &self,
103 bucket_location: &BucketLocation,
104 already_retrieved_peer_ids: &[Id],
105 peer_self: &Peer,
106 server_id: &ServerId,
107 now: TimeMillis,
108 ) -> GetCacheResult {
109 let location_id = bucket_location.location_id;
110 let entry_arc = self.bundles.get_with(location_id, || Arc::new(Mutex::new(CachedPostBundleLocationEntry::placeholder())));
111
112 let (cached_items, already_cached_peer_ids, should_issue_token) = {
113 let mut entry = entry_arc.lock().unwrap();
114 entry.hit_count += 1;
115
116 let already_retrieved_set: std::collections::HashSet<Id> = already_retrieved_peer_ids.iter().copied().collect();
117 let cached_items: Vec<Bytes> = entry.bundles
118 .iter()
119 .filter(|(originator_id, bundle)| !already_retrieved_set.contains(originator_id) && !bundle.is_stale(now))
120 .map(|(_, bundle)| bundle.bytes.clone())
121 .collect();
122
123 let already_cached_peer_ids: Vec<Id> = entry.bundles.keys().copied().collect();
124 let should_issue_token = entry.hit_count >= CACHE_HIT_THRESHOLD && !self.inflight.contains_key(&location_id);
125 (cached_items, already_cached_peer_ids, should_issue_token)
126 };
127
128 let cache_request_token = if should_issue_token {
129 self.inflight.insert(location_id, ());
130 let expires_at = now + CACHE_REQUEST_TOKEN_TTL_DURATION_MILLIS;
131 Some(CacheRequestTokenV1::new(peer_self.clone(), bucket_location.clone(), expires_at, already_cached_peer_ids, &server_id.keys.signature_key))
132 } else {
133 None
134 };
135
136 GetCacheResult { cached_items, cache_request_token }
137 }
138
139 pub fn on_upload(
143 &self,
144 location_id: Id,
145 originator_peer_id: Id,
146 bundle_bytes: Bytes,
147 server_time: TimeMillis,
148 is_sealed: bool,
149 ) -> bool {
150 let entry_arc = match self.bundles.get(&location_id) {
151 Some(e) => e,
152 None => return false, };
154
155 let mut entry = entry_arc.lock().unwrap();
156 let expires_at = if is_sealed { None } else { Some(server_time + MILLIS_IN_MINUTE.const_mul(5)) };
157 let bundle = CachedBundle { bytes: bundle_bytes, expires_at };
158
159 entry.bundles.insert(originator_peer_id, bundle);
161
162 while entry.bundles.len() > self.max_originators_per_location {
167 let evict_key = entry.bundles
168 .iter()
169 .min_by(|(id_a, bundle_a), (id_b, bundle_b)| {
170 let distance_a = leading_agreement_bits_xor(id_a.as_ref(), location_id.as_ref());
171 let distance_b = leading_agreement_bits_xor(id_b.as_ref(), location_id.as_ref());
172 distance_a.cmp(&distance_b).then_with(|| {
173 let expires_a = bundle_a.expires_at.unwrap_or(TimeMillis(i64::MAX));
174 let expires_b = bundle_b.expires_at.unwrap_or(TimeMillis(i64::MAX));
175 expires_a.cmp(&expires_b)
176 })
177 })
178 .map(|(id, _)| *id);
179 if let Some(k) = evict_key {
180 entry.bundles.remove(&k);
181 }
182 }
183
184 entry.bundles.contains_key(&originator_peer_id)
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use bytes::Bytes;
193 use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
194 use hashiverse_lib::tools::server_id::ServerId;
195 use hashiverse_lib::tools::time::TimeMillis;
196 use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
197 use hashiverse_lib::tools::parallel_pow_generator::StubParallelPowGenerator;
198 use hashiverse_lib::tools::types::{Id, Pow};
199
200 async fn make_test_server_and_peer() -> anyhow::Result<(ServerId, hashiverse_lib::protocol::peer::Peer)> {
201 let time_provider = RealTimeProvider::default();
202 let pow_generator = StubParallelPowGenerator::new();
203 let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
204 let peer = server_id.to_peer(&time_provider)?;
205 Ok((server_id, peer))
206 }
207
208 fn make_test_bucket_location() -> BucketLocation {
209 BucketLocation::new(BucketType::User, Id::random(), BUCKET_DURATIONS[0], TimeMillis(1_000_000)).unwrap()
210 }
211
212 #[tokio::test]
213 async fn test_below_threshold_no_token() -> anyhow::Result<()> {
214 let (server_id, peer_self) = make_test_server_and_peer().await?;
215 let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
216 let bucket_location = make_test_bucket_location();
217 let now = TimeMillis(1_000_000);
218
219 for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
220 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
221 assert!(result.cache_request_token.is_none());
222 assert!(result.cached_items.is_empty());
223 }
224
225 Ok(())
226 }
227
228 #[tokio::test]
229 async fn test_at_threshold_token_issued_then_deduplicated() -> anyhow::Result<()> {
230 let (server_id, peer_self) = make_test_server_and_peer().await?;
231 let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
232 let bucket_location = make_test_bucket_location();
233 let now = TimeMillis(1_000_000);
234
235 for _ in 0..(CACHE_HIT_THRESHOLD - 1) {
236 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
237 assert!(result.cache_request_token.is_none());
238 }
239
240 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
242 assert!(result.cache_request_token.is_some());
243
244 let result2 = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
246 assert!(result2.cache_request_token.is_none());
247
248 Ok(())
249 }
250
251 #[tokio::test]
252 async fn test_upload_and_retrieval() -> anyhow::Result<()> {
253 let (server_id, peer_self) = make_test_server_and_peer().await?;
254 let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
255 let bucket_location = make_test_bucket_location();
256 let location_id = bucket_location.location_id;
257 let now = TimeMillis(1_000_000);
258 let originator_id = Id::random();
259 let bundle_bytes = Bytes::from_static(b"test_bundle");
260
261 cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
263
264 let accepted = cache.on_upload(location_id, originator_id, bundle_bytes.clone(), now, false);
265 assert!(accepted);
266
267 let result = cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
268 assert_eq!(result.cached_items, vec![bundle_bytes]);
269
270 Ok(())
271 }
272
273 #[tokio::test]
274 async fn test_already_retrieved_filtered() -> anyhow::Result<()> {
275 let (server_id, peer_self) = make_test_server_and_peer().await?;
276 let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
277 let bucket_location = make_test_bucket_location();
278 let location_id = bucket_location.location_id;
279 let now = TimeMillis(1_000_000);
280 let originator_id = Id::random();
281
282 cache.on_get(&bucket_location, &[], &peer_self, &server_id, now);
283 cache.on_upload(location_id, originator_id, Bytes::from_static(b"bundle"), now, false);
284
285 let result = cache.on_get(&bucket_location, &[originator_id], &peer_self, &server_id, now);
286 assert!(result.cached_items.is_empty());
287
288 Ok(())
289 }
290
291 #[tokio::test]
292 async fn test_upload_returns_false_when_not_in_cache() -> anyhow::Result<()> {
293 let cache = PostBundleCache::new(5, 64 * 1024 * 1024);
294 let location_id = Id::random();
295 let originator_id = Id::random();
296
297 let accepted = cache.on_upload(location_id, originator_id, Bytes::from_static(b"bundle"), TimeMillis(1_000_000), false);
299 assert!(!accepted);
300
301 Ok(())
302 }
303}