Skip to main content

hashiverse_lib/client/post_bundle/
post_bundle_feedback_healing.rs

1//! # Peer-to-peer reconciliation of post-bundle feedback
2//!
3//! Analogue of [`crate::client::post_bundle::post_bundle_healing`] for the feedback
4//! layer. After a fetch has pulled feedback from several peers, this module merges them
5//! into a best-known view and spawns background `HealPostBundleFeedbackV1` RPCs to each
6//! peer whose copy is missing reactions or has weaker per-client signed entries. No
7//! two-phase token is needed for feedback because the payload is small and fully
8//! self-describing.
9
10use std::sync::Arc;
11use crate::anyhow_assert_eq;
12use crate::protocol::payload::payload::{HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind};
13use crate::protocol::peer::Peer;
14use crate::protocol::posting::encoded_post_bundle_feedback::EncodedPostBundleFeedbackV1;
15use crate::protocol::posting::encoded_post_feedback::{EncodedPostFeedbackV1, EncodedPostFeedbackViewV1};
16use crate::protocol::rpc;
17use crate::tools::runtime_services::RuntimeServices;
18use crate::tools::types::{Id, Salt};
19use log::{info, warn};
20
21// For each bundle/peer, collect feedbacks that are missing or weaker than the global max
22struct HealWork {
23    target_peer: Peer,
24    location_id: Id,
25    feedbacks_to_send: Vec<EncodedPostFeedbackV1>,
26}
27
28
29/// Spawns a background task that sends each peer the feedbacks it is missing
30/// relative to `merged` (the pre-computed union of all peer bundles).
31/// Does nothing if there are fewer than 2 bundles or merged feedbacks are empty.
32pub fn heal_post_bundle_feedbacks(
33    runtime_services: Arc<RuntimeServices>,
34    sponsor_id: Id,
35    peers_visited: &[Peer],
36    bundles: &[EncodedPostBundleFeedbackV1],
37    best_encoded_post_bundle_feedback: &EncodedPostBundleFeedbackV1,
38) {
39    // Nothing to heal with a single bundle
40    if bundles.len() < 2 || best_encoded_post_bundle_feedback.feedbacks_bytes.is_empty() {
41        return;
42    }
43
44    let mut work: Vec<HealWork> = Vec::new();
45    for bundle in bundles {
46        if !peers_visited.iter().any(|p| p.id == bundle.header.peer.id) {
47            continue;
48        }
49        let mut feedbacks_to_send = Vec::new();
50        for view in EncodedPostFeedbackViewV1::iter(&best_encoded_post_bundle_feedback.feedbacks_bytes) {
51            let Ok(view) = view else { continue };
52            let Ok(post_id) = Id::from_slice(view.post_id_bytes()) else { continue };
53            let peer_pow = bundle.get_post_pow_for_feedback_type(&post_id, view.feedback_type());
54            if view.pow() > peer_pow {
55                feedbacks_to_send.push(EncodedPostFeedbackV1 {
56                    post_id,
57                    feedback_type: view.feedback_type(),
58                    salt: Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero()),
59                    pow: view.pow(),
60                });
61            }
62        }
63        if !feedbacks_to_send.is_empty() {
64            work.push(HealWork {
65                target_peer: bundle.header.peer.clone(),
66                location_id: bundle.header.location_id,
67                feedbacks_to_send,
68            });
69        }
70    }
71
72    if work.is_empty() {
73        return;
74    }
75
76    crate::tools::tools::spawn_background_task(async move {
77        for task in work {
78            let result = try_heal_single_bundle_feedback(
79                &runtime_services,
80                &sponsor_id,
81                &task.target_peer,
82                task.location_id,
83                &task.feedbacks_to_send,
84            )
85            .await;
86            if let Err(e) = result {
87                warn!("Healing post bundle feedback for {} failed: {}", task.target_peer, e);
88            }
89        }
90    });
91}
92
93async fn try_heal_single_bundle_feedback(
94    runtime_services: &RuntimeServices,
95    sponsor_id: &Id,
96    target_peer: &Peer,
97    location_id: Id,
98    feedbacks_to_send: &[EncodedPostFeedbackV1],
99) -> anyhow::Result<()> {
100    let request_bytes = HealPostBundleFeedbackV1::new_to_bytes(&location_id, feedbacks_to_send)?;
101    let response = rpc::rpc::rpc_server_known(
102        runtime_services,
103        sponsor_id,
104        target_peer,
105        PayloadRequestKind::HealPostBundleFeedbackV1,
106        request_bytes,
107    )
108    .await?;
109    anyhow_assert_eq!(&PayloadResponseKind::HealPostBundleFeedbackResponseV1, &response.response_request_kind);
110    let response = crate::tools::json::bytes_to_struct::<HealPostBundleFeedbackResponseV1>(&response.bytes)?;
111
112    if response.accepted_count > 0 {
113        info!("Healed {} feedback(s) on {}", response.accepted_count, target_peer);
114    }
115
116    Ok(())
117}