hashiverse_lib/client/peer_tracker/
peer_iterator.rs1use crate::client::peer_tracker::peer_tracker::PeerTracker;
20use crate::protocol::peer::Peer;
21use crate::tools::tools;
22use crate::tools::tools::LeadingAgreementBits;
23use crate::tools::types::Id;
24use log::warn;
25use std::collections::HashSet;
26
27pub struct PeerIterator<'a> {
28 tracker: &'a mut PeerTracker,
29 bucket_location_id: Id,
30 max_iterations_since_high_watermark: usize,
31 peers_already_queried: HashSet<Id>,
32 high_watermark: LeadingAgreementBits,
33 iterations_since_high_watermark: usize,
34 cache_radius: Option<LeadingAgreementBits>,
35}
36
37impl<'a> PeerIterator<'a> {
38 pub fn new(tracker: &'a mut PeerTracker, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> Self {
39 Self {
40 tracker,
41 bucket_location_id,
42 max_iterations_since_high_watermark,
43 peers_already_queried: HashSet::new(),
44 high_watermark: 0,
45 iterations_since_high_watermark: 0,
46 cache_radius: cache_radius,
47 }
48 }
49 pub fn next_peer(&mut self) -> Option<(Peer, LeadingAgreementBits)> {
50 loop {
51 let nearest_peer = self
52 .tracker
53 .peers()
54 .iter()
55 .filter(|peer| !self.peers_already_queried.contains(&peer.id))
56 .map(|peer| (peer, tools::leading_agreement_bits_xor(&self.bucket_location_id.0, &peer.id.0)))
57 .filter(|(_, lab)| self.cache_radius.map_or(true, |r| *lab < r))
58 .max_by_key(|peer| peer.1);
59
60 match nearest_peer {
61 Some(nearest_peer) => {
62 self.peers_already_queried.insert(nearest_peer.0.id);
63
64 if nearest_peer.1 > self.high_watermark {
65 self.high_watermark = nearest_peer.1;
66 self.iterations_since_high_watermark = 0;
67 }
68 else {
69 self.iterations_since_high_watermark += 1;
70 if self.iterations_since_high_watermark > self.max_iterations_since_high_watermark {
71 return None;
72 }
73 }
74
75 if let Some(r) = &mut self.cache_radius {
77 *r = (*r + 1).min(256);
78 }
79
80 return Some((nearest_peer.0.clone(), nearest_peer.1));
81 }
82 None => {
83 let any_unvisited = self.tracker.peers().iter().any(|p| !self.peers_already_queried.contains(&p.id));
86 if !any_unvisited {
87 return None;
88 }
89 match &mut self.cache_radius {
91 Some(r) => *r = (*r + 1).min(256),
92 None => return None,
93 }
94 }
95 }
96 }
97 }
98
99 pub fn iterations_since_high_watermark(&self) -> usize {
100 self.iterations_since_high_watermark
101 }
102
103 pub fn add_peers(&mut self, peers: Vec<Peer>) {
104 for peer in peers {
105 if let Err(e) = self.tracker.add_peer(peer) {
106 warn!("not adding invalid peer: {}", e);
107 }
108 }
109 }
110 pub fn remove_peer(&mut self, peer: &Peer) {
111 self.tracker.remove_peer(peer);
112 }
113}
114
115pub struct ConvergeToLocationVisitResult {
116 pub done: bool, pub peer_unavailable: bool, pub peers_discovered: Vec<Peer>, }
120
121#[async_trait::async_trait]
122pub trait ConvergeToLocationVisitor: Send + Sync {
123 async fn on_peer(&mut self, peer: &Peer) -> anyhow::Result<ConvergeToLocationVisitResult>;
124}
125
126#[cfg(test)]
127mod tests {
128 use crate::client::client_storage::mem_client_storage::MemClientStorage;
129 use crate::client::peer_tracker::peer_tracker::PeerTracker;
130 use crate::tools::buckets::{BUCKET_DURATIONS, BucketLocation, BucketType, generate_bucket_location};
131 use crate::tools::config;
132 use crate::tools::parallel_pow_generator::StubParallelPowGenerator;
133 use crate::tools::runtime_services::RuntimeServices;
134 use crate::tools::server_id::ServerId;
135 use crate::tools::time::{DurationMillis, TimeMillis};
136 use crate::tools::time_provider::time_provider::RealTimeProvider;
137 use crate::tools::types::{Id, Pow};
138 use crate::transport::mem_transport::MemTransportFactory;
139 use std::sync::Arc;
140
141 fn get_test_runtime_services() -> Arc<RuntimeServices> {
142 Arc::new(RuntimeServices {
143 time_provider: Arc::new(RealTimeProvider::default()),
144 transport_factory: MemTransportFactory::default(),
145 pow_generator: Arc::new(StubParallelPowGenerator::new()),
146 })
147 }
148
149 #[tokio::test]
150 async fn general_tests() -> anyhow::Result<()> {
151 let runtime_services = RuntimeServices::default_for_testing();
152 let client_storage = MemClientStorage::new().await?;
153 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
154
155 assert!(peer_tracker.is_empty());
156 assert_eq!(0, peer_tracker.len());
157
158 {
160 loop {
161 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), Pow(config::SERVER_KEY_POW_MIN.0 / 2), true, runtime_services.pow_generator.as_ref()).await?;
162 if server_id.pow >= config::SERVER_KEY_POW_MIN {
163 continue;
164 }
165 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
166 let result = peer_tracker.add_peer(peer);
167 assert!(result.is_err());
168 assert_eq!(0, peer_tracker.len());
169 break;
170 }
171 }
172
173 {
175 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
176 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
177 let result = peer_tracker.add_peer(peer);
178 assert!(result.is_ok());
179 assert_eq!(1, peer_tracker.len());
180 }
181
182 {
184 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
185 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
186 let result = peer_tracker.add_peer(peer.clone());
187 assert!(result.is_ok());
188 assert_eq!(2, peer_tracker.len());
189 let result = peer_tracker.add_peer(peer.clone());
190 assert!(result.is_ok());
191 assert_eq!(2, peer_tracker.len());
192 }
193
194 {
196 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
197 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
198 let result = peer_tracker.add_peer(peer.clone());
199 assert!(result.is_ok());
200 assert_eq!(3, peer_tracker.len());
201 peer_tracker.remove_peer(&peer);
202 assert_eq!(2, peer_tracker.len());
203 }
204
205 {
207 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
208 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
209 peer_tracker.remove_peer(&peer);
210 assert_eq!(2, peer_tracker.len());
211 }
212
213 Ok(())
214 }
215
216 #[tokio::test]
217 async fn converge_basics_test() -> anyhow::Result<()> {
218 let runtime_services = get_test_runtime_services();
219 let client_storage = MemClientStorage::new().await?;
220 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
221
222 const NUM_PEERS: usize = 100;
223
224 {
225 for _ in 0..NUM_PEERS {
226 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
227 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
228 peer_tracker.add_peer(peer)?;
229 }
230 assert_eq!(NUM_PEERS, peer_tracker.len());
231 }
232
233 {
235 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
236 let mut count = 0;
237 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
238 while let Some(_peer) = peer_iter.next_peer() {
239 count += 1;
240 }
241 assert_eq!(NUM_PEERS, count);
242 };
243
244 {
246 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
247 let mut count = 0;
248 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
249 while let Some(_peer) = peer_iter.next_peer() {
250 count += 1;
251 break;
252 }
253 assert_eq!(1, count);
254 };
255
256 {
258 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
259 let mut count = 0;
260 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
261 while let Some((peer, _)) = peer_iter.next_peer() {
262 count += 1;
263 if 0 == count % 2 {
264 peer_iter.remove_peer(&peer);
265 }
266 }
267 assert_eq!(NUM_PEERS, count);
268 assert_eq!(NUM_PEERS / 2, peer_tracker.len());
269 }
270
271 Ok(())
272 }
273
274 #[tokio::test]
275 async fn converge_termination_test() -> anyhow::Result<()> {
276 let runtime_services = get_test_runtime_services();
277 let client_storage = MemClientStorage::new().await?;
278 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
279
280 const NUM_PEERS: usize = 100;
281
282 {
283 for _ in 0..NUM_PEERS {
284 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
285 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
286 peer_tracker.add_peer(peer)?;
287 }
288 assert_eq!(NUM_PEERS, peer_tracker.len());
289 }
290
291 {
292 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
293 let mut count = 0;
294 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 3, None).await?;
295 while let Some(_peer) = peer_iter.next_peer() {
296 count += 1;
297 }
298 assert_eq!(3 + 1, count);
299 }
300
301 Ok(())
302 }
303
304 #[tokio::test]
305 async fn converge_insertions_test() -> anyhow::Result<()> {
306 let runtime_services = get_test_runtime_services();
307 let client_storage = MemClientStorage::new().await?;
308 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
309
310 const NUM_PEERS: usize = 100;
311
312 {
313 for _ in 0..NUM_PEERS {
314 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
315 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
316 peer_tracker.add_peer(peer)?;
317 }
318 assert_eq!(NUM_PEERS, peer_tracker.len());
319 }
320
321 {
323 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
324 let mut count = 0;
325 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
326 while let Some(_peer) = peer_iter.next_peer() {
327 count += 1;
328
329 if 0 == count % 10 {
330 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
331 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
332 peer_iter.add_peers(vec![peer]);
333 }
334
335 if 50 == count {
336 break;
337 }
338 }
339
340 assert_eq!(50, count);
341 assert_eq!(NUM_PEERS + 5, peer_tracker.len());
342 }
343
344 Ok(())
345 }
346
347 #[tokio::test]
348 async fn converge_targeting_test() -> anyhow::Result<()> {
349 let runtime_services = get_test_runtime_services();
350 let client_storage = MemClientStorage::new().await?;
351 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
352
353 const NUM_PEERS: usize = 100;
354
355 {
356 for _ in 0..NUM_PEERS {
357 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
358 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
359 peer_tracker.add_peer(peer)?;
360 }
361 assert_eq!(NUM_PEERS, peer_tracker.len());
362 }
363
364 let target_server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
366 let target_peer = target_server_id.to_peer(runtime_services.time_provider.as_ref())?;
367
368 {
369 const PEER_DISCOVERY_I: usize = 37usize;
370 const PEER_DISCOVERY_I_PLUS_1: usize = PEER_DISCOVERY_I + 1;
371
372 let bucket_location = {
373 let mut location_id = target_peer.id.clone();
374 for i in 10..31 {
375 location_id.0[i] = 0u8;
376 }
377 BucketLocation {
378 bucket_type: BucketType::User,
379 base_id: location_id.clone(),
380 duration: DurationMillis::zero(),
381 bucket_time_millis: TimeMillis::zero(),
382 location_id: location_id.clone(),
383 }
384 };
385
386 let mut count = 0;
387 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
388 while let Some((peer, _)) = peer_iter.next_peer() {
389 count += 1;
390 match count {
391 PEER_DISCOVERY_I => {
392 peer_iter.add_peers(vec![target_peer.clone()]);
393 }
394 PEER_DISCOVERY_I_PLUS_1 => {
395 if peer.id != target_peer.id {
396 anyhow::bail!("peer is not the one we expected");
397 }
398 break;
399 }
400 _ => {}
401 }
402 }
403
404 assert_eq!(PEER_DISCOVERY_I_PLUS_1, count);
405 assert_eq!(NUM_PEERS + 1, peer_tracker.len());
406 }
407
408 Ok(())
409 }
410
411 #[tokio::test]
414 async fn converge_cache_radius_test() -> anyhow::Result<()> {
415 let runtime_services = get_test_runtime_services();
416 let client_storage = MemClientStorage::new().await?;
417 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
418
419 let location_id = Id::zero();
420
421 let make_peer_with_lab = |lab_bits: usize| -> anyhow::Result<crate::protocol::peer::Peer> {
422 let mut id_bytes = [0u8; 32];
423 let byte_idx = lab_bits / 8;
424 let bit_idx = 7 - (lab_bits % 8);
425 id_bytes[byte_idx] = 1u8 << bit_idx;
426 let id = Id(id_bytes);
427 let _ = id;
428 anyhow::bail!("use direct ServerId below")
429 };
430 let _ = make_peer_with_lab;
431
432 const NUM_PEERS: usize = 100;
433 let mut labs_added: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
434 for _ in 0..NUM_PEERS {
435 let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
436 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
437 let lab = crate::tools::tools::leading_agreement_bits_xor(&location_id.0, &peer.id.0);
438 labs_added.push(lab);
439 peer_tracker.add_peer(peer)?;
440 }
441 assert_eq!(NUM_PEERS, peer_tracker.len());
442
443 let mut sorted_labs = labs_added.clone();
444 sorted_labs.sort();
445 let cache_radius = sorted_labs[NUM_PEERS / 2];
446
447 let mut labs_visited: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
448 let mut peer_iter = peer_tracker.iterate_to_location(location_id, usize::MAX, Some(cache_radius)).await?;
449 while let Some((_, lab)) = peer_iter.next_peer() {
450 labs_visited.push(lab);
451 }
452
453 assert_eq!(NUM_PEERS, labs_visited.len(), "all peers should be visited");
454
455 let has_outside_peers = labs_added.iter().any(|&lab| lab < cache_radius);
456 if has_outside_peers {
457 assert!(labs_visited[0] < cache_radius, "first peer should be outside the initial cache zone, got lab={} cache_radius={}", labs_visited[0], cache_radius);
458 }
459
460 Ok(())
461 }
462}