hashiverse_lib/client/post_bundle/
post_bundle_feedback_healing.rs1use std::sync::Arc;
11use crate::anyhow_assert_eq;
12use crate::protocol::payload::payload::{HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind};
13use crate::protocol::peer::Peer;
14use crate::protocol::posting::encoded_post_bundle_feedback::EncodedPostBundleFeedbackV1;
15use crate::protocol::posting::encoded_post_feedback::{EncodedPostFeedbackV1, EncodedPostFeedbackViewV1};
16use crate::protocol::rpc;
17use crate::tools::runtime_services::RuntimeServices;
18use crate::tools::types::{Id, Salt};
19use log::{info, warn};
20
21struct HealWork {
23 target_peer: Peer,
24 location_id: Id,
25 feedbacks_to_send: Vec<EncodedPostFeedbackV1>,
26}
27
28
29pub fn heal_post_bundle_feedbacks(
33 runtime_services: Arc<RuntimeServices>,
34 sponsor_id: Id,
35 peers_visited: &[Peer],
36 bundles: &[EncodedPostBundleFeedbackV1],
37 best_encoded_post_bundle_feedback: &EncodedPostBundleFeedbackV1,
38) {
39 if bundles.len() < 2 || best_encoded_post_bundle_feedback.feedbacks_bytes.is_empty() {
41 return;
42 }
43
44 let mut work: Vec<HealWork> = Vec::new();
45 for bundle in bundles {
46 if !peers_visited.iter().any(|p| p.id == bundle.header.peer.id) {
47 continue;
48 }
49 let mut feedbacks_to_send = Vec::new();
50 for view in EncodedPostFeedbackViewV1::iter(&best_encoded_post_bundle_feedback.feedbacks_bytes) {
51 let Ok(view) = view else { continue };
52 let Ok(post_id) = Id::from_slice(view.post_id_bytes()) else { continue };
53 let peer_pow = bundle.get_post_pow_for_feedback_type(&post_id, view.feedback_type());
54 if view.pow() > peer_pow {
55 feedbacks_to_send.push(EncodedPostFeedbackV1 {
56 post_id,
57 feedback_type: view.feedback_type(),
58 salt: Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero()),
59 pow: view.pow(),
60 });
61 }
62 }
63 if !feedbacks_to_send.is_empty() {
64 work.push(HealWork {
65 target_peer: bundle.header.peer.clone(),
66 location_id: bundle.header.location_id,
67 feedbacks_to_send,
68 });
69 }
70 }
71
72 if work.is_empty() {
73 return;
74 }
75
76 crate::tools::tools::spawn_background_task(async move {
77 for task in work {
78 let result = try_heal_single_bundle_feedback(
79 &runtime_services,
80 &sponsor_id,
81 &task.target_peer,
82 task.location_id,
83 &task.feedbacks_to_send,
84 )
85 .await;
86 if let Err(e) = result {
87 warn!("Healing post bundle feedback for {} failed: {}", task.target_peer, e);
88 }
89 }
90 });
91}
92
93async fn try_heal_single_bundle_feedback(
94 runtime_services: &RuntimeServices,
95 sponsor_id: &Id,
96 target_peer: &Peer,
97 location_id: Id,
98 feedbacks_to_send: &[EncodedPostFeedbackV1],
99) -> anyhow::Result<()> {
100 let request_bytes = HealPostBundleFeedbackV1::new_to_bytes(&location_id, feedbacks_to_send)?;
101 let response = rpc::rpc::rpc_server_known(
102 runtime_services,
103 sponsor_id,
104 target_peer,
105 PayloadRequestKind::HealPostBundleFeedbackV1,
106 request_bytes,
107 )
108 .await?;
109 anyhow_assert_eq!(&PayloadResponseKind::HealPostBundleFeedbackResponseV1, &response.response_request_kind);
110 let response = crate::tools::json::bytes_to_struct::<HealPostBundleFeedbackResponseV1>(&response.bytes)?;
111
112 if response.accepted_count > 0 {
113 info!("Healed {} feedback(s) on {}", response.accepted_count, target_peer);
114 }
115
116 Ok(())
117}