hashiverse_lib/client/post_bundle/
post_bundle_healing.rs1use crate::anyhow_assert_eq;
20use crate::protocol::payload::payload::{HealPostBundleClaimResponseV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1, PayloadRequestKind, PayloadResponseKind};
21use crate::protocol::peer::Peer;
22use crate::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
23use crate::protocol::rpc;
24use crate::tools::buckets::BucketLocation;
25use crate::tools::json;
26use crate::tools::runtime_services::RuntimeServices;
27use crate::tools::types::Id;
28use bytes::Bytes;
29use log::{info, warn};
30use std::collections::HashMap;
31use std::sync::Arc;
32
33struct HealWork {
34 target_peer: Peer,
35 bucket_location: BucketLocation,
36 donor_header: EncodedPostBundleHeaderV1,
37 donor_posts_bytes: Bytes,
38 post_byte_map: HashMap<Id, (usize, usize)>,
39}
40
41pub fn heal_post_bundles(runtime_services: Arc<RuntimeServices>, sponsor_id: Id, bucket_location: BucketLocation, peers_visited: &[Peer], bundles: Vec<EncodedPostBundleV1>) {
45 if bundles.is_empty() {
46 return;
47 }
48
49 let byte_maps: Vec<HashMap<Id, (usize, usize)>> = bundles
51 .iter()
52 .map(|bundle| {
53 let mut map = HashMap::new();
54 let mut offset = 0usize;
55 for (post_id, &len) in bundle.header.encoded_post_ids.iter().zip(bundle.header.encoded_post_lengths.iter()) {
56 map.insert(*post_id, (offset, len));
57 offset += len;
58 }
59 map
60 })
61 .collect();
62
63 let mut work: Vec<HealWork> = Vec::new();
67 for donor_idx in 0..bundles.len() {
68 for target_idx in 0..bundles.len() {
69 if donor_idx == target_idx {
70 continue;
71 }
72 let donor = &bundles[donor_idx];
73 let target = &bundles[target_idx];
74 let donor_map = &byte_maps[donor_idx];
75 if !peers_visited.iter().any(|p| p.id == target.header.peer.id) {
76 continue;
77 }
78 if donor.header.encoded_post_ids.iter().any(|id| !target.header.encoded_post_ids.contains(id)) {
79 info!("Queueing healing for target_peer={} bucket_location={}", target.header.peer, bucket_location);
80 work.push(HealWork {
81 target_peer: target.header.peer.clone(),
82 bucket_location: bucket_location.clone(),
83 donor_header: donor.header.clone(),
84 donor_posts_bytes: donor.encoded_posts_bytes.clone(),
85 post_byte_map: donor_map.clone(),
86 });
87 }
88 }
89 }
90
91 if !work.is_empty() {
92 crate::tools::tools::spawn_background_task(async move {
93 for task in work {
94 let result = try_heal_single_bundle(&runtime_services, &sponsor_id, &task.target_peer, task.bucket_location, task.donor_header, &task.donor_posts_bytes, &task.post_byte_map).await;
95 if let Err(e) = result {
96 warn!("Healing failed for target_peer={}: {}", task.target_peer, e);
97 }
98 }
99 });
100 }
101}
102
103async fn try_heal_single_bundle(
104 runtime_services: &RuntimeServices,
105 sponsor_id: &Id,
106 target_peer: &Peer,
107 bucket_location: BucketLocation,
108 donor_header: EncodedPostBundleHeaderV1,
109 donor_posts_bytes: &[u8],
110 post_byte_map: &HashMap<Id, (usize, usize)>,
111) -> anyhow::Result<()> {
112 info!("About to heal {}", target_peer);
113
114 let request_bytes = HealPostBundleClaimV1::new_to_bytes(&donor_header, &bucket_location)?;
116 let response = rpc::rpc::rpc_server_known(runtime_services, sponsor_id, target_peer, PayloadRequestKind::HealPostBundleClaimV1, request_bytes).await?;
117 anyhow_assert_eq!(&PayloadResponseKind::HealPostBundleClaimResponseV1, &response.response_request_kind);
118 let response = json::bytes_to_struct::<HealPostBundleClaimResponseV1>(&response.bytes)?;
119
120 let Some(token) = response.token
121 else {
122 info!("Healing skipped because of no HealPostBundleClaimTokenV1 for {}", target_peer);
123 return Ok(()); };
125
126 let mut encoded_posts_bytes = Vec::new();
128 for post_id in &token.needed_post_ids {
129 let &(offset, len) = post_byte_map.get(post_id).ok_or_else(|| anyhow::anyhow!("needed post {} not in donor bundle", post_id))?;
130 encoded_posts_bytes.extend_from_slice(&donor_posts_bytes[offset..offset + len]);
131 }
132
133 let request_bytes = HealPostBundleCommitV1::new_to_bytes(&token, &donor_header, &encoded_posts_bytes)?;
135 let response = rpc::rpc::rpc_server_known_with_no_compression(runtime_services, sponsor_id, target_peer, PayloadRequestKind::HealPostBundleCommitV1, request_bytes).await?;
136 anyhow_assert_eq!(&PayloadResponseKind::HealPostBundleCommitResponseV1, &response.response_request_kind);
137 let response = json::bytes_to_struct::<HealPostBundleCommitResponseV1>(&response.bytes)?;
138
139 if response.accepted {
140 info!("Healed {} post(s) on {} from {}", token.needed_post_ids.len(), target_peer, donor_header.peer);
141 }
142 else {
143 info!("Healing not accepted for {} post(s) on {} from {}", token.needed_post_ids.len(), target_peer, donor_header.peer);
144 }
145
146 Ok(())
147}