1use crate::anyhow_assert_eq;
20use crate::client::client_storage::client_storage;
21use crate::client::client_storage::client_storage::{ClientStorage, BUCKET_PEER};
22use crate::client::peer_tracker::peer_iterator::PeerIterator;
23use crate::protocol::payload::payload::{BootstrapResponseV1, BootstrapV1, PayloadRequestKind, PayloadResponseKind};
24use crate::protocol::peer::Peer;
25use crate::protocol::rpc;
26use crate::tools::runtime_services::RuntimeServices;
27use crate::tools::tools::LeadingAgreementBits;
28use crate::tools::types::Id;
29use crate::tools::{config, json, tools};
30use log::{info, trace, warn};
31use std::sync::Arc;
32
33pub struct PeerTracker {
48 runtime_services: Arc<RuntimeServices>,
49 client_storage: Arc<dyn ClientStorage>,
50 peers_need_flush: bool,
51 peers: Vec<Peer>,
52}
53
54impl PeerTracker {
55 pub async fn new(runtime_services: Arc<RuntimeServices>, client_storage: Arc<dyn ClientStorage>) -> anyhow::Result<Self> {
56 let peers: anyhow::Result<Vec<Peer>> = try {
57 let peers = client_storage::get_struct::<Vec<Peer>>(client_storage.as_ref(), BUCKET_PEER, "peers", runtime_services.time_provider.current_time_millis()).await?;
58 match peers {
59 Some(peers) => {
60 info!("PeerTracker is starting with {} peers", peers.len());
61 trace!("{:?}", peers);
62 peers
63 }
64 None => Vec::new(),
65 }
66 };
67
68 let peers = peers.unwrap_or_else(|e| {
69 warn!("Failed to load peers from storage: {}", e);
70 Vec::new()
71 });
72
73 Ok(Self {
74 runtime_services,
75 client_storage,
76 peers_need_flush: false,
77 peers,
78 })
79 }
80
81 pub async fn flush(&mut self) -> anyhow::Result<()> {
82 if !self.peers_need_flush {
83 return Ok(());
84 }
85
86 trace!("Flushing peers to storage");
87 self.peers_need_flush = false;
88 client_storage::put_struct(self.client_storage.as_ref(), BUCKET_PEER, "peers", &self.peers, self.runtime_services.time_provider.current_time_millis()).await?;
89
90 Ok(())
91 }
92
93 pub fn add_peer(&mut self, peer: Peer) -> anyhow::Result<()> {
94 if let Err(e) = peer.verify() {
96 anyhow::bail!("peer verification error: {}", e);
97 }
98
99 if peer.pow_initial.pow < config::SERVER_KEY_POW_MIN {
101 anyhow::bail!("peer peer.pow_initial.pow={} < {}", peer.pow_initial.pow, config::SERVER_KEY_POW_MIN);
102 }
103
104 let search_result = self.peers.binary_search_by_key(&peer.id, |peer| peer.id);
105 match search_result {
106 Ok(i) => {
107 assert_eq!(peer.id, self.peers[i].id);
109
110 if peer.timestamp > self.peers[i].timestamp {
112 self.peers[i] = peer;
113 }
114 }
115 Err(i) => {
116 self.peers.insert(i, peer);
117 }
118 }
119
120 self.peers_need_flush = true;
121
122 Ok(())
123 }
124
125 pub fn remove_peer(&mut self, peer: &Peer) {
126 if let Ok(i) = self.peers.binary_search_by_key(&peer.id, |peer| peer.id) {
127 self.peers.remove(i);
128 self.peers_need_flush = true;
129 }
130 }
131
132 pub fn is_empty(&self) -> bool {
133 self.peers.is_empty()
134 }
135 pub fn len(&self) -> usize {
136 self.peers.len()
137 }
138
139 pub fn peers(&self) -> &Vec<Peer> {
140 &self.peers
141 }
142
143 pub async fn iterate_to_location(&mut self, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> anyhow::Result<PeerIterator<'_>> {
144 self.bootstrap().await?;
145
146 Ok(PeerIterator::new(self, bucket_location_id, max_iterations_since_high_watermark, cache_radius))
147 }
148
149 pub async fn bootstrap(&mut self) -> anyhow::Result<()> {
150 if !self.is_empty() {
152 return Ok(());
153 }
154
155 info!("bootstrapping PeerTracker");
156
157 let mut bootstrap_addresses = self.runtime_services.transport_factory.get_bootstrap_addresses().await;
159 tools::shuffle(&mut bootstrap_addresses);
160
161 for bootstrap_address in bootstrap_addresses {
163 let try_result: anyhow::Result<()> = try {
164 {
165 info!("bootstrapping {}", bootstrap_address);
166
167 let request = json::struct_to_bytes(&BootstrapV1 {})?;
168 let response = rpc::rpc::rpc_server_unknown(&self.runtime_services, &Id::zero(), &bootstrap_address, PayloadRequestKind::BootstrapV1, request).await?;
169 anyhow_assert_eq!(&PayloadResponseKind::BootstrapResponseV1, &response.response_request_kind);
170 let response = json::bytes_to_struct::<BootstrapResponseV1>(&response.bytes)?;
171 for peer in response.peers_random {
172 let result = self.add_peer(peer);
173 if let Err(e) = result {
174 warn!("problem while adding bootstrapped peer: {}", e);
175 }
176 }
177 }
178 };
179
180 if let Err(e) = try_result {
181 warn!("problem bootstrapping peer {}: {}", bootstrap_address, e);
182 }
183
184 if !self.is_empty() {
186 break;
187 }
188 }
189
190 Ok(())
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use crate::client::client_storage::mem_client_storage::MemClientStorage;
197 use crate::client::peer_tracker::peer_tracker::PeerTracker;
198 use crate::tools::buckets::{generate_bucket_location, BucketLocation, BucketType, BUCKET_DURATIONS};
199 use crate::tools::config;
200 use crate::tools::runtime_services::RuntimeServices;
201 use crate::tools::server_id::ServerId;
202 use crate::tools::time::{DurationMillis, TimeMillis};
203 use crate::tools::types::{Id, Pow};
204
205 #[tokio::test]
206 async fn general_tests() -> anyhow::Result<()> {
207 let runtime_services = RuntimeServices::default_for_testing();
208 let client_storage = MemClientStorage::new().await?;
209 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
210
211 assert!(peer_tracker.is_empty());
212 assert_eq!(0, peer_tracker.len());
213
214 {
216 loop {
218 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), Pow(config::SERVER_KEY_POW_MIN.0 / 2), true, runtime_services.pow_generator.as_ref()).await?;
219
220 if server_id.pow >= config::SERVER_KEY_POW_MIN {
222 continue;
223 }
224
225 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
226 let result = peer_tracker.add_peer(peer);
227 assert!(result.is_err());
228 assert_eq!(0, peer_tracker.len());
229
230 break;
231 }
232 }
233
234 {
236 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
237 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
238
239 {
240 let result = peer_tracker.add_peer(peer);
241 assert!(result.is_ok());
242 assert_eq!(1, peer_tracker.len());
243 }
244 }
245
246 {
248 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
249 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
250
251 {
252 let result = peer_tracker.add_peer(peer.clone());
253 assert!(result.is_ok());
254 assert_eq!(2, peer_tracker.len());
255 }
256 {
257 let result = peer_tracker.add_peer(peer.clone());
258 assert!(result.is_ok());
259 assert_eq!(2, peer_tracker.len());
260 }
261 }
262
263 {
265 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
266 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
267
268 {
269 let result = peer_tracker.add_peer(peer.clone());
270 assert!(result.is_ok());
271 assert_eq!(3, peer_tracker.len());
272 }
273
274 {
275 peer_tracker.remove_peer(&peer);
276 assert_eq!(2, peer_tracker.len());
277 }
278 }
279
280 {
282 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
283 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
284
285 {
286 peer_tracker.remove_peer(&peer);
287 assert_eq!(2, peer_tracker.len());
288 }
289 }
290
291 Ok(())
292 }
293
294 #[tokio::test]
295 async fn converge_basics_test() -> anyhow::Result<()> {
296 let runtime_services = RuntimeServices::default_for_testing();
297 let client_storage = MemClientStorage::new().await?;
298 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
299 const NUM_PEERS: usize = 100;
302
303 {
304 for _ in 0..NUM_PEERS {
305 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
306 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
307 peer_tracker.add_peer(peer)?;
308 }
309 assert_eq!(NUM_PEERS, peer_tracker.len());
310 }
311
312 {
313 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
314 let mut count = 0;
315 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
316 while let Some(_peer) = peer_iter.next_peer() { count += 1; }
317 assert_eq!(NUM_PEERS, count);
318 };
319
320 {
321 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
322 let mut count = 0;
323 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
324 while let Some(_peer) = peer_iter.next_peer() { count += 1; break; }
325 assert_eq!(1, count);
326 };
327
328 {
329 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
330 let mut count = 0;
331 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
332 while let Some((peer, _)) = peer_iter.next_peer() {
333 count += 1;
334 if 0 == count % 2 { peer_iter.remove_peer(&peer); }
335 }
336 assert_eq!(NUM_PEERS, count);
337 assert_eq!(NUM_PEERS / 2, peer_tracker.len());
338 }
339
340 Ok(())
341 }
342
343 #[tokio::test]
344 async fn converge_termination_test() -> anyhow::Result<()> {
345 let runtime_services = RuntimeServices::default_for_testing();
346 let client_storage = MemClientStorage::new().await?;
347 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
348
349 const NUM_PEERS: usize = 100;
350
351 {
352 for _ in 0..NUM_PEERS {
353 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
354 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
355 peer_tracker.add_peer(peer)?;
356 }
357 assert_eq!(NUM_PEERS, peer_tracker.len());
358 }
359
360 {
361 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
362 let mut count = 0;
363 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 3, None).await?;
364 while let Some(_peer) = peer_iter.next_peer() { count += 1; }
365 assert_eq!(3 + 1, count);
366 }
367
368 Ok(())
369 }
370
371 #[tokio::test]
372 async fn converge_insertions_test() -> anyhow::Result<()> {
373 let runtime_services = RuntimeServices::default_for_testing();
374 let client_storage = MemClientStorage::new().await?;
375 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
376
377 const NUM_PEERS: usize = 100;
378
379 {
380 for _ in 0..NUM_PEERS {
381 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
382 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
383 peer_tracker.add_peer(peer)?;
384 }
385 assert_eq!(NUM_PEERS, peer_tracker.len());
386 }
387
388 {
389 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
390 let mut count = 0;
391 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
392 while let Some(_peer) = peer_iter.next_peer() {
393 count += 1;
394 if 0 == count % 10 {
395 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
396 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
397 peer_iter.add_peers(vec![peer]);
398 }
399 if 50 == count { break; }
400 }
401 assert_eq!(50, count);
402 assert_eq!(NUM_PEERS + 5, peer_tracker.len());
403 }
404
405 Ok(())
406 }
407
408 #[tokio::test]
409 async fn converge_targeting_test() -> anyhow::Result<()> {
410 let runtime_services = RuntimeServices::default_for_testing();
411 let client_storage = MemClientStorage::new().await?;
412 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
413
414 const NUM_PEERS: usize = 100;
415
416 {
417 for _ in 0..NUM_PEERS {
418 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
419 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
420 peer_tracker.add_peer(peer)?;
421 }
422 assert_eq!(NUM_PEERS, peer_tracker.len());
423 }
424
425 let target_server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
426 let target_peer = target_server_id.to_peer(runtime_services.time_provider.as_ref())?;
427
428 {
429 const PEER_DISCOVERY_I: usize = 37usize;
430 const PEER_DISCOVERY_I_PLUS_1: usize = PEER_DISCOVERY_I + 1;
431
432 let bucket_location = {
433 let mut location_id = target_peer.id.clone();
434 for i in 10..31 { location_id.0[i] = 0u8; }
435 BucketLocation {
436 bucket_type: BucketType::User,
437 base_id: location_id.clone(),
438 duration: DurationMillis::zero(),
439 bucket_time_millis: TimeMillis::zero(),
440 location_id: location_id.clone(),
441 }
442 };
443
444 let mut count = 0;
445 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
446 while let Some((peer, _)) = peer_iter.next_peer() {
447 count += 1;
448 match count {
449 PEER_DISCOVERY_I => { peer_iter.add_peers(vec![target_peer.clone()]); }
450 PEER_DISCOVERY_I_PLUS_1 => {
451 if peer.id != target_peer.id { anyhow::bail!("peer is not the one we expected"); }
452 break;
453 }
454 _ => {}
455 }
456 }
457 assert_eq!(PEER_DISCOVERY_I_PLUS_1, count);
458 assert_eq!(NUM_PEERS + 1, peer_tracker.len());
459 }
460
461 Ok(())
462 }
463
464 #[tokio::test]
467 async fn converge_cache_radius_test() -> anyhow::Result<()> {
468 let runtime_services = RuntimeServices::default_for_testing();
469 let client_storage = MemClientStorage::new().await?;
470 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
471
472 let location_id = Id::zero();
473
474 let make_peer_with_lab = |lab_bits: usize| -> anyhow::Result<crate::protocol::peer::Peer> {
475 let mut id_bytes = [0u8; 32];
476 let byte_idx = lab_bits / 8;
477 let bit_idx = 7 - (lab_bits % 8);
478 id_bytes[byte_idx] = 1u8 << bit_idx;
479 let id = Id(id_bytes);
480 let _ = id;
481 anyhow::bail!("use direct ServerId below")
482 };
483 let _ = make_peer_with_lab;
484
485 const NUM_PEERS: usize = 100;
486 let mut labs_added: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
487 for _ in 0..NUM_PEERS {
488 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
489 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
490 let lab = crate::tools::tools::leading_agreement_bits_xor(&location_id.0, &peer.id.0);
491 labs_added.push(lab);
492 peer_tracker.add_peer(peer)?;
493 }
494 assert_eq!(NUM_PEERS, peer_tracker.len());
495
496 let mut sorted_labs = labs_added.clone();
497 sorted_labs.sort();
498 let cache_radius = sorted_labs[NUM_PEERS / 2];
499
500 let mut labs_visited: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
501 let mut peer_iter = peer_tracker.iterate_to_location(location_id, usize::MAX, Some(cache_radius)).await?;
502 while let Some((_, lab)) = peer_iter.next_peer() { labs_visited.push(lab); }
503
504 assert_eq!(NUM_PEERS, labs_visited.len(), "all peers should be visited");
505
506 let has_outside_peers = labs_added.iter().any(|&lab| lab < cache_radius);
507 if has_outside_peers {
508 assert!(labs_visited[0] < cache_radius, "first peer should be outside the initial cache zone, got lab={} cache_radius={}", labs_visited[0], cache_radius);
509 }
510
511 Ok(())
512 }
513}