Skip to main content

hashiverse_lib/client/peer_tracker/
peer_iterator.rs

1//! # Stateful peer walk by XOR distance
2//!
3//! `PeerIterator` is the execution primitive for every DHT operation: given a target
4//! [`crate::tools::types::Id`] and a
5//! [`crate::client::peer_tracker::peer_tracker::PeerTracker`], hand back peers in order
6//! of decreasing closeness while remembering which ones have already been tried.
7//!
8//! Two knobs tune the walk:
9//!
10//! - **High-watermark** on [`crate::tools::tools::LeadingAgreementBits`] — once `N`
11//!   iterations have passed without finding a peer closer than the best so far, the
12//!   iterator gives up. This is the standard Kademlia "no further progress possible"
13//!   signal.
14//! - **Cache radius** — supplied by
15//!   [`crate::client::caching::cache_radius_tracker`]. Peers closer than the recorded
16//!   cache radius are skipped, because whatever we'd fetch from them is already cached
17//!   further out; fetching again just hammers the closest nodes.
18
19use crate::client::peer_tracker::peer_tracker::PeerTracker;
20use crate::protocol::peer::Peer;
21use crate::tools::tools;
22use crate::tools::tools::LeadingAgreementBits;
23use crate::tools::types::Id;
24use log::warn;
25use std::collections::HashSet;
26
27pub struct PeerIterator<'a> {
28    tracker: &'a mut PeerTracker,
29    bucket_location_id: Id,
30    max_iterations_since_high_watermark: usize,
31    peers_already_queried: HashSet<Id>,
32    high_watermark: LeadingAgreementBits,
33    iterations_since_high_watermark: usize,
34    cache_radius: Option<LeadingAgreementBits>,
35}
36
37impl<'a> PeerIterator<'a> {
38    pub fn new(tracker: &'a mut PeerTracker, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> Self {
39        Self {
40            tracker,
41            bucket_location_id,
42            max_iterations_since_high_watermark,
43            peers_already_queried: HashSet::new(),
44            high_watermark: 0,
45            iterations_since_high_watermark: 0,
46            cache_radius: cache_radius,
47        }
48    }
49    pub fn next_peer(&mut self) -> Option<(Peer, LeadingAgreementBits)> {
50        loop {
51            let nearest_peer = self
52                .tracker
53                .peers()
54                .iter()
55                .filter(|peer| !self.peers_already_queried.contains(&peer.id))
56                .map(|peer| (peer, tools::leading_agreement_bits_xor(&self.bucket_location_id.0, &peer.id.0)))
57                .filter(|(_, lab)| self.cache_radius.map_or(true, |r| *lab < r))
58                .max_by_key(|peer| peer.1);
59
60            match nearest_peer {
61                Some(nearest_peer) => {
62                    self.peers_already_queried.insert(nearest_peer.0.id);
63
64                    if nearest_peer.1 > self.high_watermark {
65                        self.high_watermark = nearest_peer.1;
66                        self.iterations_since_high_watermark = 0;
67                    }
68                    else {
69                        self.iterations_since_high_watermark += 1;
70                        if self.iterations_since_high_watermark > self.max_iterations_since_high_watermark {
71                            return None;
72                        }
73                    }
74
75                    // Each successful return opens one more ring of closer peers on the next call.
76                    if let Some(r) = &mut self.cache_radius {
77                        *r = (*r + 1).min(256);
78                    }
79
80                    return Some((nearest_peer.0.clone(), nearest_peer.1));
81                }
82                None => {
83                    // No unvisited peer passes the current radius filter.
84                    // If there are no unvisited peers at all, we are done.
85                    let any_unvisited = self.tracker.peers().iter().any(|p| !self.peers_already_queried.contains(&p.id));
86                    if !any_unvisited {
87                        return None;
88                    }
89                    // Allow the next ring of closer peers and retry.
90                    match &mut self.cache_radius {
91                        Some(r) => *r = (*r + 1).min(256),
92                        None => return None,
93                    }
94                }
95            }
96        }
97    }
98
99    pub fn iterations_since_high_watermark(&self) -> usize {
100        self.iterations_since_high_watermark
101    }
102
103    pub fn add_peers(&mut self, peers: Vec<Peer>) {
104        for peer in peers {
105            if let Err(e) = self.tracker.add_peer(peer) {
106                warn!("not adding invalid peer: {}", e);
107            }
108        }
109    }
110    pub fn remove_peer(&mut self, peer: &Peer) {
111        self.tracker.remove_peer(peer);
112    }
113}
114
115pub struct ConvergeToLocationVisitResult {
116    pub done: bool,                  // Stop iterating
117    pub peer_unavailable: bool,      // Indicate that this peer has a problem of some sort and should be removed
118    pub peers_discovered: Vec<Peer>, // Supply some newly discovered Peers to add to the iterations
119}
120
121#[async_trait::async_trait]
122pub trait ConvergeToLocationVisitor: Send + Sync {
123    async fn on_peer(&mut self, peer: &Peer) -> anyhow::Result<ConvergeToLocationVisitResult>;
124}
125
126#[cfg(test)]
127mod tests {
128    use crate::client::client_storage::mem_client_storage::MemClientStorage;
129    use crate::client::peer_tracker::peer_tracker::PeerTracker;
130    use crate::tools::buckets::{BUCKET_DURATIONS, BucketLocation, BucketType, generate_bucket_location};
131    use crate::tools::config;
132    use crate::tools::parallel_pow_generator::StubParallelPowGenerator;
133    use crate::tools::runtime_services::RuntimeServices;
134    use crate::tools::server_id::ServerId;
135    use crate::tools::time::{DurationMillis, TimeMillis};
136    use crate::tools::time_provider::time_provider::RealTimeProvider;
137    use crate::tools::types::{Id, Pow};
138    use crate::transport::mem_transport::MemTransportFactory;
139    use std::sync::Arc;
140
141    fn get_test_runtime_services() -> Arc<RuntimeServices> {
142        Arc::new(RuntimeServices {
143            time_provider: Arc::new(RealTimeProvider::default()),
144            transport_factory: MemTransportFactory::default(),
145            pow_generator: Arc::new(StubParallelPowGenerator::new()),
146        })
147    }
148
149    #[tokio::test]
150    async fn general_tests() -> anyhow::Result<()> {
151        let runtime_services = RuntimeServices::default_for_testing();
152        let client_storage = MemClientStorage::new().await?;
153        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
154
155        assert!(peer_tracker.is_empty());
156        assert_eq!(0, peer_tracker.len());
157
158        // Dont accept insufficient pow
159        {
160            loop {
161                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), Pow(config::SERVER_KEY_POW_MIN.0 / 2), true, runtime_services.pow_generator.as_ref()).await?;
162                if server_id.pow >= config::SERVER_KEY_POW_MIN {
163                    continue;
164                }
165                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
166                let result = peer_tracker.add_peer(peer);
167                assert!(result.is_err());
168                assert_eq!(0, peer_tracker.len());
169                break;
170            }
171        }
172
173        // Add an individual
174        {
175            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
176            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
177            let result = peer_tracker.add_peer(peer);
178            assert!(result.is_ok());
179            assert_eq!(1, peer_tracker.len());
180        }
181
182        // Cant add individual twice
183        {
184            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
185            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
186            let result = peer_tracker.add_peer(peer.clone());
187            assert!(result.is_ok());
188            assert_eq!(2, peer_tracker.len());
189            let result = peer_tracker.add_peer(peer.clone());
190            assert!(result.is_ok());
191            assert_eq!(2, peer_tracker.len());
192        }
193
194        // Add an individual, then remove it
195        {
196            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
197            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
198            let result = peer_tracker.add_peer(peer.clone());
199            assert!(result.is_ok());
200            assert_eq!(3, peer_tracker.len());
201            peer_tracker.remove_peer(&peer);
202            assert_eq!(2, peer_tracker.len());
203        }
204
205        // Remove an unknown individual
206        {
207            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
208            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
209            peer_tracker.remove_peer(&peer);
210            assert_eq!(2, peer_tracker.len());
211        }
212
213        Ok(())
214    }
215
216    #[tokio::test]
217    async fn converge_basics_test() -> anyhow::Result<()> {
218        let runtime_services = get_test_runtime_services();
219        let client_storage = MemClientStorage::new().await?;
220        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
221
222        const NUM_PEERS: usize = 100;
223
224        {
225            for _ in 0..NUM_PEERS {
226                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
227                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
228                peer_tracker.add_peer(peer)?;
229            }
230            assert_eq!(NUM_PEERS, peer_tracker.len());
231        }
232
233        // Now iterate through them all
234        {
235            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
236            let mut count = 0;
237            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
238            while let Some(_peer) = peer_iter.next_peer() {
239                count += 1;
240            }
241            assert_eq!(NUM_PEERS, count);
242        };
243
244        // Now iterate through them, but bail after the first
245        {
246            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
247            let mut count = 0;
248            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
249            while let Some(_peer) = peer_iter.next_peer() {
250                count += 1;
251                break;
252            }
253            assert_eq!(1, count);
254        };
255
256        // Now iterate through them, but delete half of them
257        {
258            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
259            let mut count = 0;
260            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
261            while let Some((peer, _)) = peer_iter.next_peer() {
262                count += 1;
263                if 0 == count % 2 {
264                    peer_iter.remove_peer(&peer);
265                }
266            }
267            assert_eq!(NUM_PEERS, count);
268            assert_eq!(NUM_PEERS / 2, peer_tracker.len());
269        }
270
271        Ok(())
272    }
273
274    #[tokio::test]
275    async fn converge_termination_test() -> anyhow::Result<()> {
276        let runtime_services = get_test_runtime_services();
277        let client_storage = MemClientStorage::new().await?;
278        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
279
280        const NUM_PEERS: usize = 100;
281
282        {
283            for _ in 0..NUM_PEERS {
284                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
285                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
286                peer_tracker.add_peer(peer)?;
287            }
288            assert_eq!(NUM_PEERS, peer_tracker.len());
289        }
290
291        {
292            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
293            let mut count = 0;
294            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 3, None).await?;
295            while let Some(_peer) = peer_iter.next_peer() {
296                count += 1;
297            }
298            assert_eq!(3 + 1, count);
299        }
300
301        Ok(())
302    }
303
304    #[tokio::test]
305    async fn converge_insertions_test() -> anyhow::Result<()> {
306        let runtime_services = get_test_runtime_services();
307        let client_storage = MemClientStorage::new().await?;
308        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
309
310        const NUM_PEERS: usize = 100;
311
312        {
313            for _ in 0..NUM_PEERS {
314                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
315                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
316                peer_tracker.add_peer(peer)?;
317            }
318            assert_eq!(NUM_PEERS, peer_tracker.len());
319        }
320
321        // Now iterate through them, but add a few more peers
322        {
323            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
324            let mut count = 0;
325            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
326            while let Some(_peer) = peer_iter.next_peer() {
327                count += 1;
328
329                if 0 == count % 10 {
330                    let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
331                    let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
332                    peer_iter.add_peers(vec![peer]);
333                }
334
335                if 50 == count {
336                    break;
337                }
338            }
339
340            assert_eq!(50, count);
341            assert_eq!(NUM_PEERS + 5, peer_tracker.len());
342        }
343
344        Ok(())
345    }
346
347    #[tokio::test]
348    async fn converge_targeting_test() -> anyhow::Result<()> {
349        let runtime_services = get_test_runtime_services();
350        let client_storage = MemClientStorage::new().await?;
351        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
352
353        const NUM_PEERS: usize = 100;
354
355        {
356            for _ in 0..NUM_PEERS {
357                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
358                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
359                peer_tracker.add_peer(peer)?;
360            }
361            assert_eq!(NUM_PEERS, peer_tracker.len());
362        }
363
364        // This is the peer we are actually targeting
365        let target_server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
366        let target_peer = target_server_id.to_peer(runtime_services.time_provider.as_ref())?;
367
368        {
369            const PEER_DISCOVERY_I: usize = 37usize;
370            const PEER_DISCOVERY_I_PLUS_1: usize = PEER_DISCOVERY_I + 1;
371
372            let bucket_location = {
373                let mut location_id = target_peer.id.clone();
374                for i in 10..31 {
375                    location_id.0[i] = 0u8;
376                }
377                BucketLocation {
378                    bucket_type: BucketType::User,
379                    base_id: location_id.clone(),
380                    duration: DurationMillis::zero(),
381                    bucket_time_millis: TimeMillis::zero(),
382                    location_id: location_id.clone(),
383                }
384            };
385
386            let mut count = 0;
387            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
388            while let Some((peer, _)) = peer_iter.next_peer() {
389                count += 1;
390                match count {
391                    PEER_DISCOVERY_I => {
392                        peer_iter.add_peers(vec![target_peer.clone()]);
393                    }
394                    PEER_DISCOVERY_I_PLUS_1 => {
395                        if peer.id != target_peer.id {
396                            anyhow::bail!("peer is not the one we expected");
397                        }
398                        break;
399                    }
400                    _ => {}
401                }
402            }
403
404            assert_eq!(PEER_DISCOVERY_I_PLUS_1, count);
405            assert_eq!(NUM_PEERS + 1, peer_tracker.len());
406        }
407
408        Ok(())
409    }
410
411    /// Verify that `cache_radius` starts by skipping peers inside the radius, then opens up
412    /// one ring per step so that closer peers are eventually visited too.
413    #[tokio::test]
414    async fn converge_cache_radius_test() -> anyhow::Result<()> {
415        let runtime_services = get_test_runtime_services();
416        let client_storage = MemClientStorage::new().await?;
417        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
418
419        let location_id = Id::zero();
420
421        let make_peer_with_lab = |lab_bits: usize| -> anyhow::Result<crate::protocol::peer::Peer> {
422            let mut id_bytes = [0u8; 32];
423            let byte_idx = lab_bits / 8;
424            let bit_idx = 7 - (lab_bits % 8);
425            id_bytes[byte_idx] = 1u8 << bit_idx;
426            let id = Id(id_bytes);
427            let _ = id;
428            anyhow::bail!("use direct ServerId below")
429        };
430        let _ = make_peer_with_lab;
431
432        const NUM_PEERS: usize = 100;
433        let mut labs_added: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
434        for _ in 0..NUM_PEERS {
435            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
436            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
437            let lab = crate::tools::tools::leading_agreement_bits_xor(&location_id.0, &peer.id.0);
438            labs_added.push(lab);
439            peer_tracker.add_peer(peer)?;
440        }
441        assert_eq!(NUM_PEERS, peer_tracker.len());
442
443        let mut sorted_labs = labs_added.clone();
444        sorted_labs.sort();
445        let cache_radius = sorted_labs[NUM_PEERS / 2];
446
447        let mut labs_visited: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
448        let mut peer_iter = peer_tracker.iterate_to_location(location_id, usize::MAX, Some(cache_radius)).await?;
449        while let Some((_, lab)) = peer_iter.next_peer() {
450            labs_visited.push(lab);
451        }
452
453        assert_eq!(NUM_PEERS, labs_visited.len(), "all peers should be visited");
454
455        let has_outside_peers = labs_added.iter().any(|&lab| lab < cache_radius);
456        if has_outside_peers {
457            assert!(labs_visited[0] < cache_radius, "first peer should be outside the initial cache zone, got lab={} cache_radius={}", labs_visited[0], cache_radius);
458        }
459
460        Ok(())
461    }
462}