Skip to main content

hashiverse_lib/client/post_bundle/
post_bundle_healing.rs

1//! # Peer-to-peer reconciliation of divergent post bundles
2//!
3//! When multiple peers return different versions of the same post bundle — one
4//! might hold posts the others have missed — this module spawns a background task that
5//! sends each laggard peer the posts it's missing.
6//!
7//! The sync is a two-phase, PoW-gated protocol:
8//!
9//! 1. **Claim** (`HealPostBundleClaimV1`) — the initiator describes what it holds; the
10//!    remote server returns which post IDs it's still missing and issues a commit token
11//!    authorising the follow-up upload.
12//! 2. **Commit** (`HealPostBundleCommitV1`) — the initiator uploads just those missing
13//!    post bytes under the token.
14//!
15//! Splitting into claim + commit keeps the bandwidth-hungry upload from happening until
16//! the receiver has confirmed it actually wants the data, and it makes per-server
17//! response authentication cheap (the token is what the commit carries).
18
19use crate::anyhow_assert_eq;
20use crate::protocol::payload::payload::{HealPostBundleClaimResponseV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1, PayloadRequestKind, PayloadResponseKind};
21use crate::protocol::peer::Peer;
22use crate::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
23use crate::protocol::rpc;
24use crate::tools::buckets::BucketLocation;
25use crate::tools::json;
26use crate::tools::runtime_services::RuntimeServices;
27use crate::tools::types::Id;
28use bytes::Bytes;
29use log::{info, warn};
30use std::collections::HashMap;
31use std::sync::Arc;
32
33struct HealWork {
34    target_peer: Peer,
35    bucket_location: BucketLocation,
36    donor_header: EncodedPostBundleHeaderV1,
37    donor_posts_bytes: Bytes,
38    post_byte_map: HashMap<Id, (usize, usize)>,
39}
40
41/// Selects and returns the best bundle (most posts), then spawns a background task that
42/// serially heals every server whose bundle is missing posts that any other server has.
43/// Returns None if bundles is empty.
44pub fn heal_post_bundles(runtime_services: Arc<RuntimeServices>, sponsor_id: Id, bucket_location: BucketLocation, peers_visited: &[Peer], bundles: Vec<EncodedPostBundleV1>) {
45    if bundles.is_empty() {
46        return;
47    }
48
49    // Build a post_byte_map for each bundle once (offset, len per post_id)
50    let byte_maps: Vec<HashMap<Id, (usize, usize)>> = bundles
51        .iter()
52        .map(|bundle| {
53            let mut map = HashMap::new();
54            let mut offset = 0usize;
55            for (post_id, &len) in bundle.header.encoded_post_ids.iter().zip(bundle.header.encoded_post_lengths.iter()) {
56                map.insert(*post_id, (offset, len));
57                offset += len;
58            }
59            map
60        })
61        .collect();
62
63    // For each (donor, target) pair: if donor has posts that target is missing, queue a heal.
64    // Only target servers we directly visited — we should not heal servers where we have only a cached snapshot of their data.
65    // This might queue up duplicate heal tasks for the server, but the server will deny the second heal attempt.
66    let mut work: Vec<HealWork> = Vec::new();
67    for donor_idx in 0..bundles.len() {
68        for target_idx in 0..bundles.len() {
69            if donor_idx == target_idx {
70                continue;
71            }
72            let donor = &bundles[donor_idx];
73            let target = &bundles[target_idx];
74            let donor_map = &byte_maps[donor_idx];
75            if !peers_visited.iter().any(|p| p.id == target.header.peer.id) {
76                continue;
77            }
78            if donor.header.encoded_post_ids.iter().any(|id| !target.header.encoded_post_ids.contains(id)) {
79                info!("Queueing healing for target_peer={} bucket_location={}", target.header.peer, bucket_location);
80                work.push(HealWork {
81                    target_peer: target.header.peer.clone(),
82                    bucket_location: bucket_location.clone(),
83                    donor_header: donor.header.clone(),
84                    donor_posts_bytes: donor.encoded_posts_bytes.clone(),
85                    post_byte_map: donor_map.clone(),
86                });
87            }
88        }
89    }
90
91    if !work.is_empty() {
92        crate::tools::tools::spawn_background_task(async move {
93            for task in work {
94                let result = try_heal_single_bundle(&runtime_services, &sponsor_id, &task.target_peer, task.bucket_location, task.donor_header, &task.donor_posts_bytes, &task.post_byte_map).await;
95                if let Err(e) = result {
96                    warn!("Healing failed for target_peer={}: {}", task.target_peer, e);
97                }
98            }
99        });
100    }
101}
102
103async fn try_heal_single_bundle(
104    runtime_services: &RuntimeServices,
105    sponsor_id: &Id,
106    target_peer: &Peer,
107    bucket_location: BucketLocation,
108    donor_header: EncodedPostBundleHeaderV1,
109    donor_posts_bytes: &[u8],
110    post_byte_map: &HashMap<Id, (usize, usize)>,
111) -> anyhow::Result<()> {
112    info!("About to heal {}", target_peer);
113
114    // Phase 1: Claim — ask the server which posts it needs from this donor bundle
115    let request_bytes = HealPostBundleClaimV1::new_to_bytes(&donor_header, &bucket_location)?;
116    let response = rpc::rpc::rpc_server_known(runtime_services, sponsor_id, target_peer, PayloadRequestKind::HealPostBundleClaimV1, request_bytes).await?;
117    anyhow_assert_eq!(&PayloadResponseKind::HealPostBundleClaimResponseV1, &response.response_request_kind);
118    let response = json::bytes_to_struct::<HealPostBundleClaimResponseV1>(&response.bytes)?;
119
120    let Some(token) = response.token
121    else {
122        info!("Healing skipped because of no HealPostBundleClaimTokenV1 for {}", target_peer);
123        return Ok(()); // Server is up to date or is busy with another heal
124    };
125
126    // Assemble the bytes for the posts the server needs, in token order
127    let mut encoded_posts_bytes = Vec::new();
128    for post_id in &token.needed_post_ids {
129        let &(offset, len) = post_byte_map.get(post_id).ok_or_else(|| anyhow::anyhow!("needed post {} not in donor bundle", post_id))?;
130        encoded_posts_bytes.extend_from_slice(&donor_posts_bytes[offset..offset + len]);
131    }
132
133    // Phase 2: Commit — send the missing post bytes
134    let request_bytes = HealPostBundleCommitV1::new_to_bytes(&token, &donor_header, &encoded_posts_bytes)?;
135    let response = rpc::rpc::rpc_server_known_with_no_compression(runtime_services, sponsor_id, target_peer, PayloadRequestKind::HealPostBundleCommitV1, request_bytes).await?;
136    anyhow_assert_eq!(&PayloadResponseKind::HealPostBundleCommitResponseV1, &response.response_request_kind);
137    let response = json::bytes_to_struct::<HealPostBundleCommitResponseV1>(&response.bytes)?;
138
139    if response.accepted {
140        info!("Healed {} post(s) on {} from {}", token.needed_post_ids.len(), target_peer, donor_header.peer);
141    }
142    else {
143        info!("Healing not accepted  for {} post(s) on {} from {}", token.needed_post_ids.len(), target_peer, donor_header.peer);
144    }
145
146    Ok(())
147}