Skip to main content

hashiverse_lib/client/peer_tracker/
peer_tracker.rs

1//! # Kademlia-style local peer set
2//!
3//! `PeerTracker` is the client's working memory of the network: a deduplicated list of
4//! [`crate::protocol::peer::Peer`] records loaded from `BUCKET_PEER`, validated, and
5//! indexed by XOR distance so [`crate::client::peer_tracker::peer_iterator::PeerIterator`]
6//! can walk them in order of closeness to any target
7//! [`crate::tools::types::Id`].
8//!
9//! Responsibilities:
10//!
11//! - **Bootstrap** — when the tracker is empty (new install, wiped storage) it calls
12//!   `BootstrapV1` against seed domains from [`crate::tools::config::BOOTSTRAP_DOMAINS`]
13//!   to obtain a starting set of peers.
14//! - **Freshness** — stale peers (missed announces, failed RPCs) are demoted or dropped;
15//!   fresh ones from gossip (`AnnounceV1`) or from RPC responses are folded in.
16//! - **Flush** — peer mutations set `peers_need_flush`; the outer client batches those
17//!   into periodic writes to storage so every RPC response doesn't trigger a disk write.
18
19use crate::anyhow_assert_eq;
20use crate::client::client_storage::client_storage;
21use crate::client::client_storage::client_storage::{ClientStorage, BUCKET_PEER};
22use crate::client::peer_tracker::peer_iterator::PeerIterator;
23use crate::protocol::payload::payload::{BootstrapResponseV1, BootstrapV1, PayloadRequestKind, PayloadResponseKind};
24use crate::protocol::peer::Peer;
25use crate::protocol::rpc;
26use crate::tools::runtime_services::RuntimeServices;
27use crate::tools::tools::LeadingAgreementBits;
28use crate::tools::types::Id;
29use crate::tools::{config, json, tools};
30use log::{info, trace, warn};
31use std::sync::Arc;
32
33/// The client's local view of the known peer set and the entry point for Kademlia-style
34/// routing.
35///
36/// `PeerTracker` owns the in-memory list of [`Peer`] records the client has seen, persists
37/// them to [`ClientStorage`] under `BUCKET_PEER` so they survive restarts, and exposes the
38/// iteration primitives used throughout the client when it needs to answer "who should I
39/// talk to next about this [`Id`]?". When the list is empty (first launch, or after a
40/// reset), it seeds itself via a `BootstrapV1` RPC against the
41/// [`crate::transport::bootstrap_provider::BootstrapProvider`] addresses configured on the
42/// transport.
43///
44/// The tracker is the single source of truth for peer freshness: stale or bad peers get
45/// evicted here, new peers get folded in here, and the `peers_need_flush` flag coalesces
46/// rapid updates into a single disk write.
47pub struct PeerTracker {
48    runtime_services: Arc<RuntimeServices>,
49    client_storage: Arc<dyn ClientStorage>,
50    peers_need_flush: bool,
51    peers: Vec<Peer>,
52}
53
54impl PeerTracker {
55    pub async fn new(runtime_services: Arc<RuntimeServices>, client_storage: Arc<dyn ClientStorage>) -> anyhow::Result<Self> {
56        let peers: anyhow::Result<Vec<Peer>> = try {
57            let peers = client_storage::get_struct::<Vec<Peer>>(client_storage.as_ref(), BUCKET_PEER, "peers", runtime_services.time_provider.current_time_millis()).await?;
58            match peers {
59                Some(peers) => {
60                    info!("PeerTracker is starting with {} peers", peers.len());
61                    trace!("{:?}", peers);
62                    peers
63                }
64                None => Vec::new(),
65            }
66        };
67
68        let peers = peers.unwrap_or_else(|e| {
69            warn!("Failed to load peers from storage: {}", e);
70            Vec::new()
71        });
72
73        Ok(Self {
74            runtime_services,
75            client_storage,
76            peers_need_flush: false,
77            peers,
78        })
79    }
80
81    pub async fn flush(&mut self) -> anyhow::Result<()> {
82        if !self.peers_need_flush {
83            return Ok(());
84        }
85
86        trace!("Flushing peers to storage");
87        self.peers_need_flush = false;
88        client_storage::put_struct(self.client_storage.as_ref(), BUCKET_PEER, "peers", &self.peers, self.runtime_services.time_provider.current_time_millis()).await?;
89
90        Ok(())
91    }
92
93    pub fn add_peer(&mut self, peer: Peer) -> anyhow::Result<()> {
94        // Sanity check that this peer is kosher
95        if let Err(e) = peer.verify() {
96            anyhow::bail!("peer verification error: {}", e);
97        }
98
99        // Check that its pow is reasonable
100        if peer.pow_initial.pow < config::SERVER_KEY_POW_MIN {
101            anyhow::bail!("peer peer.pow_initial.pow={} < {}", peer.pow_initial.pow, config::SERVER_KEY_POW_MIN);
102        }
103
104        let search_result = self.peers.binary_search_by_key(&peer.id, |peer| peer.id);
105        match search_result {
106            Ok(i) => {
107                // Sanity check that the ids are the same
108                assert_eq!(peer.id, self.peers[i].id);
109
110                // If the newer peer is more recent, replace the older one
111                if peer.timestamp > self.peers[i].timestamp {
112                    self.peers[i] = peer;
113                }
114            }
115            Err(i) => {
116                self.peers.insert(i, peer);
117            }
118        }
119
120        self.peers_need_flush = true;
121
122        Ok(())
123    }
124
125    pub fn remove_peer(&mut self, peer: &Peer) {
126        if let Ok(i) = self.peers.binary_search_by_key(&peer.id, |peer| peer.id) {
127            self.peers.remove(i);
128            self.peers_need_flush = true;
129        }
130    }
131
132    pub fn is_empty(&self) -> bool {
133        self.peers.is_empty()
134    }
135    pub fn len(&self) -> usize {
136        self.peers.len()
137    }
138
139    pub fn peers(&self) -> &Vec<Peer> {
140        &self.peers
141    }
142
143    pub async fn iterate_to_location(&mut self, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> anyhow::Result<PeerIterator<'_>> {
144        self.bootstrap().await?;
145
146        Ok(PeerIterator::new(self, bucket_location_id, max_iterations_since_high_watermark, cache_radius))
147    }
148
149    pub async fn bootstrap(&mut self) -> anyhow::Result<()> {
150        // We only need to bootsrap if we have noone to talk to!
151        if !self.is_empty() {
152            return Ok(());
153        }
154
155        info!("bootstrapping PeerTracker");
156
157        // Lets randomize these addresses so that the first one is not snowed
158        let mut bootstrap_addresses = self.runtime_services.transport_factory.get_bootstrap_addresses().await;
159        tools::shuffle(&mut bootstrap_addresses);
160
161        // Our bootstrap process has handed us a bunch of raw addresses, so we need to convert them into peers
162        for bootstrap_address in bootstrap_addresses {
163            let try_result: anyhow::Result<()> = try {
164                {
165                    info!("bootstrapping {}", bootstrap_address);
166
167                    let request = json::struct_to_bytes(&BootstrapV1 {})?;
168                    let response = rpc::rpc::rpc_server_unknown(&self.runtime_services, &Id::zero(), &bootstrap_address, PayloadRequestKind::BootstrapV1, request).await?;
169                    anyhow_assert_eq!(&PayloadResponseKind::BootstrapResponseV1, &response.response_request_kind);
170                    let response = json::bytes_to_struct::<BootstrapResponseV1>(&response.bytes)?;
171                    for peer in response.peers_random {
172                        let result = self.add_peer(peer);
173                        if let Err(e) = result {
174                            warn!("problem while adding bootstrapped peer: {}", e);
175                        }
176                    }
177                }
178            };
179
180            if let Err(e) = try_result {
181                warn!("problem bootstrapping peer {}: {}", bootstrap_address, e);
182            }
183
184            // We only need to continue bootstrapping if we still have noone to talk to!
185            if !self.is_empty() {
186                break;
187            }
188        }
189
190        Ok(())
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use crate::client::client_storage::mem_client_storage::MemClientStorage;
197    use crate::client::peer_tracker::peer_tracker::PeerTracker;
198    use crate::tools::buckets::{generate_bucket_location, BucketLocation, BucketType, BUCKET_DURATIONS};
199    use crate::tools::config;
200    use crate::tools::runtime_services::RuntimeServices;
201    use crate::tools::server_id::ServerId;
202    use crate::tools::time::{DurationMillis, TimeMillis};
203    use crate::tools::types::{Id, Pow};
204
205    #[tokio::test]
206    async fn general_tests() -> anyhow::Result<()> {
207        let runtime_services = RuntimeServices::default_for_testing();
208        let client_storage = MemClientStorage::new().await?;
209        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
210
211        assert!(peer_tracker.is_empty());
212        assert_eq!(0, peer_tracker.len());
213
214        // Dont accept insufficient pow
215        {
216            // We have to loop because sometimes the diminished pow actually is sufficient by chance
217            loop {
218                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), Pow(config::SERVER_KEY_POW_MIN.0 / 2), true, runtime_services.pow_generator.as_ref()).await?;
219
220                // Check that we havent succeeded by statistical mistake
221                if server_id.pow >= config::SERVER_KEY_POW_MIN {
222                    continue;
223                }
224
225                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
226                let result = peer_tracker.add_peer(peer);
227                assert!(result.is_err());
228                assert_eq!(0, peer_tracker.len());
229
230                break;
231            }
232        }
233
234        // Add an individual
235        {
236            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
237            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
238
239            {
240                let result = peer_tracker.add_peer(peer);
241                assert!(result.is_ok());
242                assert_eq!(1, peer_tracker.len());
243            }
244        }
245
246        // Cant add individual twice
247        {
248            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
249            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
250
251            {
252                let result = peer_tracker.add_peer(peer.clone());
253                assert!(result.is_ok());
254                assert_eq!(2, peer_tracker.len());
255            }
256            {
257                let result = peer_tracker.add_peer(peer.clone());
258                assert!(result.is_ok());
259                assert_eq!(2, peer_tracker.len());
260            }
261        }
262
263        // Add an individual, then remove it
264        {
265            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
266            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
267
268            {
269                let result = peer_tracker.add_peer(peer.clone());
270                assert!(result.is_ok());
271                assert_eq!(3, peer_tracker.len());
272            }
273
274            {
275                peer_tracker.remove_peer(&peer);
276                assert_eq!(2, peer_tracker.len());
277            }
278        }
279
280        // Remove an unknown individual
281        {
282            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
283            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
284
285            {
286                peer_tracker.remove_peer(&peer);
287                assert_eq!(2, peer_tracker.len());
288            }
289        }
290
291        Ok(())
292    }
293
294    #[tokio::test]
295    async fn converge_basics_test() -> anyhow::Result<()> {
296        let runtime_services = RuntimeServices::default_for_testing();
297        let client_storage = MemClientStorage::new().await?;
298        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
299        // configure_logging_with_time_provider("trace", runtime_services.time_provider.clone());
300
301        const NUM_PEERS: usize = 100;
302
303        {
304            for _ in 0..NUM_PEERS {
305                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
306                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
307                peer_tracker.add_peer(peer)?;
308            }
309            assert_eq!(NUM_PEERS, peer_tracker.len());
310        }
311
312        {
313            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
314            let mut count = 0;
315            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
316            while let Some(_peer) = peer_iter.next_peer() { count += 1; }
317            assert_eq!(NUM_PEERS, count);
318        };
319
320        {
321            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
322            let mut count = 0;
323            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
324            while let Some(_peer) = peer_iter.next_peer() { count += 1; break; }
325            assert_eq!(1, count);
326        };
327
328        {
329            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
330            let mut count = 0;
331            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
332            while let Some((peer, _)) = peer_iter.next_peer() {
333                count += 1;
334                if 0 == count % 2 { peer_iter.remove_peer(&peer); }
335            }
336            assert_eq!(NUM_PEERS, count);
337            assert_eq!(NUM_PEERS / 2, peer_tracker.len());
338        }
339
340        Ok(())
341    }
342
343    #[tokio::test]
344    async fn converge_termination_test() -> anyhow::Result<()> {
345        let runtime_services = RuntimeServices::default_for_testing();
346        let client_storage = MemClientStorage::new().await?;
347        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
348
349        const NUM_PEERS: usize = 100;
350
351        {
352            for _ in 0..NUM_PEERS {
353                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
354                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
355                peer_tracker.add_peer(peer)?;
356            }
357            assert_eq!(NUM_PEERS, peer_tracker.len());
358        }
359
360        {
361            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
362            let mut count = 0;
363            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 3, None).await?;
364            while let Some(_peer) = peer_iter.next_peer() { count += 1; }
365            assert_eq!(3 + 1, count);
366        }
367
368        Ok(())
369    }
370
371    #[tokio::test]
372    async fn converge_insertions_test() -> anyhow::Result<()> {
373        let runtime_services = RuntimeServices::default_for_testing();
374        let client_storage = MemClientStorage::new().await?;
375        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
376
377        const NUM_PEERS: usize = 100;
378
379        {
380            for _ in 0..NUM_PEERS {
381                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
382                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
383                peer_tracker.add_peer(peer)?;
384            }
385            assert_eq!(NUM_PEERS, peer_tracker.len());
386        }
387
388        {
389            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
390            let mut count = 0;
391            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
392            while let Some(_peer) = peer_iter.next_peer() {
393                count += 1;
394                if 0 == count % 10 {
395                    let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
396                    let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
397                    peer_iter.add_peers(vec![peer]);
398                }
399                if 50 == count { break; }
400            }
401            assert_eq!(50, count);
402            assert_eq!(NUM_PEERS + 5, peer_tracker.len());
403        }
404
405        Ok(())
406    }
407
408    #[tokio::test]
409    async fn converge_targeting_test() -> anyhow::Result<()> {
410        let runtime_services = RuntimeServices::default_for_testing();
411        let client_storage = MemClientStorage::new().await?;
412        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
413
414        const NUM_PEERS: usize = 100;
415
416        {
417            for _ in 0..NUM_PEERS {
418                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
419                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
420                peer_tracker.add_peer(peer)?;
421            }
422            assert_eq!(NUM_PEERS, peer_tracker.len());
423        }
424
425        let target_server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
426        let target_peer = target_server_id.to_peer(runtime_services.time_provider.as_ref())?;
427
428        {
429            const PEER_DISCOVERY_I: usize = 37usize;
430            const PEER_DISCOVERY_I_PLUS_1: usize = PEER_DISCOVERY_I + 1;
431
432            let bucket_location = {
433                let mut location_id = target_peer.id.clone();
434                for i in 10..31 { location_id.0[i] = 0u8; }
435                BucketLocation {
436                    bucket_type: BucketType::User,
437                    base_id: location_id.clone(),
438                    duration: DurationMillis::zero(),
439                    bucket_time_millis: TimeMillis::zero(),
440                    location_id: location_id.clone(),
441                }
442            };
443
444            let mut count = 0;
445            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
446            while let Some((peer, _)) = peer_iter.next_peer() {
447                count += 1;
448                match count {
449                    PEER_DISCOVERY_I => { peer_iter.add_peers(vec![target_peer.clone()]); }
450                    PEER_DISCOVERY_I_PLUS_1 => {
451                        if peer.id != target_peer.id { anyhow::bail!("peer is not the one we expected"); }
452                        break;
453                    }
454                    _ => {}
455                }
456            }
457            assert_eq!(PEER_DISCOVERY_I_PLUS_1, count);
458            assert_eq!(NUM_PEERS + 1, peer_tracker.len());
459        }
460
461        Ok(())
462    }
463
464    /// Verify that `cache_radius` starts by skipping peers inside the radius, then opens up
465    /// one ring per step so that closer peers are eventually visited too.
466    #[tokio::test]
467    async fn converge_cache_radius_test() -> anyhow::Result<()> {
468        let runtime_services = RuntimeServices::default_for_testing();
469        let client_storage = MemClientStorage::new().await?;
470        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
471
472        let location_id = Id::zero();
473
474        let make_peer_with_lab = |lab_bits: usize| -> anyhow::Result<crate::protocol::peer::Peer> {
475            let mut id_bytes = [0u8; 32];
476            let byte_idx = lab_bits / 8;
477            let bit_idx = 7 - (lab_bits % 8);
478            id_bytes[byte_idx] = 1u8 << bit_idx;
479            let id = Id(id_bytes);
480            let _ = id;
481            anyhow::bail!("use direct ServerId below")
482        };
483        let _ = make_peer_with_lab;
484
485        const NUM_PEERS: usize = 100;
486        let mut labs_added: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
487        for _ in 0..NUM_PEERS {
488            let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
489            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
490            let lab = crate::tools::tools::leading_agreement_bits_xor(&location_id.0, &peer.id.0);
491            labs_added.push(lab);
492            peer_tracker.add_peer(peer)?;
493        }
494        assert_eq!(NUM_PEERS, peer_tracker.len());
495
496        let mut sorted_labs = labs_added.clone();
497        sorted_labs.sort();
498        let cache_radius = sorted_labs[NUM_PEERS / 2];
499
500        let mut labs_visited: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
501        let mut peer_iter = peer_tracker.iterate_to_location(location_id, usize::MAX, Some(cache_radius)).await?;
502        while let Some((_, lab)) = peer_iter.next_peer() { labs_visited.push(lab); }
503
504        assert_eq!(NUM_PEERS, labs_visited.len(), "all peers should be visited");
505
506        let has_outside_peers = labs_added.iter().any(|&lab| lab < cache_radius);
507        if has_outside_peers {
508            assert!(labs_visited[0] < cache_radius, "first peer should be outside the initial cache zone, got lab={} cache_radius={}", labs_visited[0], cache_radius);
509        }
510
511        Ok(())
512    }
513}