Skip to main content

hashiverse_lib/client/post_bundle/
live_post_bundle_feedback_manager.rs

1//! # Production [`PostBundleFeedbackManager`] — cache, fetch, heal
2//!
3//! Mirrors
4//! [`crate::client::post_bundle::live_post_bundle_manager`] but for feedback: checks
5//! `BUCKET_POST_BUNDLE_FEEDBACK` in
6//! [`crate::client::client_storage::client_storage::ClientStorage`] (with an age-based
7//! TTL since feedback mutates), falls back to `GetPostBundleFeedbackV1` against DHT
8//! peers, and spawns
9//! [`crate::client::post_bundle::post_bundle_feedback_healing`] in the background to
10//! reconcile divergent copies. Cache propagation is guided by the same
11//! [`crate::client::caching::cache_radius_tracker`] used by the post-bundle side.
12
13use crate::anyhow_assert_eq;
14use crate::client::caching::cache_radius_tracker::CacheRadiusTracker;
15use crate::client::caching::post_bundle_cache_uploader;
16use crate::client::client_storage::client_storage::{ClientStorage, BUCKET_POST_BUNDLE_FEEDBACK};
17use crate::client::peer_tracker::peer_tracker::PeerTracker;
18use crate::client::post_bundle::post_bundle_feedback_healing;
19use crate::client::post_bundle::post_bundle_feedback_manager::PostBundleFeedbackManager;
20use crate::protocol::payload::payload::{CacheRequestTokenV1, GetPostBundleFeedbackResponseV1, GetPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind};
21use crate::protocol::posting::encoded_post_bundle_feedback::EncodedPostBundleFeedbackV1;
22use crate::protocol::rpc;
23use crate::tools::buckets::BucketLocation;
24use crate::tools::config::{CLIENT_POST_BUNDLE_FEEDBACK_CACHE_DURATION};
25use crate::tools::runtime_services::RuntimeServices;
26use crate::tools::time::TimeMillis;
27use crate::tools::tools::LeadingAgreementBits;
28use crate::tools::types::Id;
29use crate::tools::{config, json};
30use bytes::Bytes;
31use log::{info, trace, warn};
32use scopeguard::defer;
33use std::collections::HashMap;
34use std::sync::Arc;
35use tokio::sync::{Mutex, RwLock};
36
37pub struct LivePostBundleFeedbackManager {
38    runtime_services: Arc<RuntimeServices>,
39    client_storage: Arc<dyn ClientStorage>,
40    peer_tracker: Arc<RwLock<PeerTracker>>,
41    sponsor_id: Id,
42    post_bundle_feedback_inflight: Mutex<HashMap<Id, Arc<Mutex<()>>>>,
43    post_bundle_feedback_cache_radius_tracker: CacheRadiusTracker,
44}
45
46impl LivePostBundleFeedbackManager {
47    pub fn new(runtime_services: Arc<RuntimeServices>, sponsor_id: Id, client_storage: Arc<dyn ClientStorage>, peer_tracker: Arc<RwLock<PeerTracker>>) -> Self {
48        Self {
49            runtime_services,
50            client_storage,
51            peer_tracker,
52            sponsor_id,
53            post_bundle_feedback_inflight: Mutex::new(HashMap::new()),
54            post_bundle_feedback_cache_radius_tracker: CacheRadiusTracker::new(CLIENT_POST_BUNDLE_FEEDBACK_CACHE_DURATION.const_mul(5)), // We let these cache radius caches last longer than our own local cache of the bundles...
55        }
56    }
57}
58
59#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
60#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
61impl PostBundleFeedbackManager for LivePostBundleFeedbackManager {
62    async fn get_post_bundle_feedback(&self, bucket_location: BucketLocation, time_millis: TimeMillis) -> anyhow::Result<EncodedPostBundleFeedbackV1> {
63        let post_bundle_location_id = bucket_location.location_id;
64        let get_from_cache = async |location_id: Id, time_millis: TimeMillis, reason: &str| -> Option<EncodedPostBundleFeedbackV1> {
65            let result: anyhow::Result<Option<EncodedPostBundleFeedbackV1>> = try {
66                let raw = self.client_storage.get(BUCKET_POST_BUNDLE_FEEDBACK, &location_id.to_hex_str(), time_millis).await?;
67                if let Some(raw) = raw {
68                    let feedback = EncodedPostBundleFeedbackV1::from_bytes(Bytes::from(raw))?;
69                    let duration = time_millis - feedback.header.time_millis;
70                    if duration < CLIENT_POST_BUNDLE_FEEDBACK_CACHE_DURATION {
71                        trace!("Using cached PostBundleFeedback for {} (age {}) at {}", location_id, duration, reason);
72                        Some(feedback)
73                    } else {
74                        trace!("Cached PostBundleFeedback for {} expired (age {}) at {}", location_id, duration, reason);
75                        None
76                    }
77                } else {
78                    None
79                }
80            };
81
82            result.unwrap_or_else(|e| {
83                warn!("discarding problematic cached PostBundleFeedback: {}", e);
84                None
85            })
86        };
87
88        // Check the cache
89        if let Some(cached) = get_from_cache(post_bundle_location_id, time_millis, "preflight").await {
90            return Ok(cached);
91        }
92
93        // Inflight deduplication: get or create a per-key mutex
94        let key_lock = {
95            let mut inflight = self.post_bundle_feedback_inflight.lock().await;
96            inflight.entry(post_bundle_location_id).or_insert_with(|| Arc::new(Mutex::new(()))).clone()
97        };
98        let _inflight_guard = key_lock.lock().await;
99        defer!(if let Ok(mut m) = self.post_bundle_feedback_inflight.try_lock() { m.remove(&post_bundle_location_id); });
100
101        // Re-check the cache - a concurrent caller may have just finished fetching and stored the result.
102        if let Some(cached) = get_from_cache(post_bundle_location_id, time_millis, "postflight").await {
103            return Ok(cached);
104        }
105
106        let cache_radius = self.post_bundle_feedback_cache_radius_tracker.get(post_bundle_location_id, time_millis);
107
108        // We need to refresh...
109        let mut peers_visited = Vec::new();
110        let mut already_retrieved_peer_ids: Vec<Id> = Vec::new();
111        let mut encoded_post_bundle_feedbacks = Vec::new();
112        let mut cache_request_tokens: Vec<CacheRequestTokenV1> = Vec::new();
113        let mut positive_responder_leading_agreement_bits: Vec<LeadingAgreementBits> = Vec::new();
114        {
115            let mut peer_tracker = self.peer_tracker.write().await;
116            let mut peer_iter = peer_tracker.iterate_to_location(post_bundle_location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, cache_radius).await?;
117            while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
118                if already_retrieved_peer_ids.contains(&peer.id) {
119                    continue;
120                }
121
122                let result: anyhow::Result<()> = try {
123                    info!("Requesting PostBundleFeedback with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
124
125                    let request = GetPostBundleFeedbackV1 {
126                        bucket_location: bucket_location.clone(),
127                        peers_visited: peers_visited.clone(),
128                        already_retrieved_peer_ids: already_retrieved_peer_ids.clone(),
129                    };
130                    let request = json::struct_to_bytes(&request)?;
131                    let response = rpc::rpc::rpc_server_known(&self.runtime_services, &self.sponsor_id, &peer, PayloadRequestKind::GetPostBundleFeedbackV1, request).await?;
132                    anyhow_assert_eq!(&PayloadResponseKind::GetPostBundleFeedbackResponseV1, &response.response_request_kind);
133                    let response = GetPostBundleFeedbackResponseV1::from_bytes(response.bytes)?;
134
135                    peers_visited.push(peer.clone());
136                    peer_iter.add_peers(response.peers_nearer);
137
138                    if let Some(token) = response.cache_request_token {
139                        cache_request_tokens.push(token);
140                    }
141
142                    let mut found_bundle_from_this_peer = false;
143
144                    for cached_bytes in response.post_bundle_feedbacks_cached {
145                        let process_result: anyhow::Result<()> = try {
146                            let post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(cached_bytes)?;
147                            post_bundle_feedback.header.verify()?;
148                            trace!("Retrieved cached PostBundleFeedback from post_bundle_location_id={} with length {}", post_bundle_location_id, post_bundle_feedback.feedbacks_bytes.len());
149
150                            already_retrieved_peer_ids.push(post_bundle_feedback.header.peer.id);
151                            encoded_post_bundle_feedbacks.push(post_bundle_feedback);
152                            found_bundle_from_this_peer = true;
153                        };
154                        if let Err(e) = process_result {
155                            warn!("Error processing cached PostBundleFeedback: {}", e);
156                        }
157                    }
158
159                    if let Some(post_bundle_feedback_raw) = response.encoded_post_bundle_feedback {
160                        let post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(post_bundle_feedback_raw)?;
161                        post_bundle_feedback.header.verify()?;
162                        trace!("Retrieved PostBundleFeedback from post_bundle_location_id={} with length {}", post_bundle_location_id, post_bundle_feedback.feedbacks_bytes.len());
163
164                        already_retrieved_peer_ids.push(post_bundle_feedback.header.peer.id);
165                        encoded_post_bundle_feedbacks.push(post_bundle_feedback);
166                        found_bundle_from_this_peer = true;
167                    }
168
169                    if found_bundle_from_this_peer {
170                        positive_responder_leading_agreement_bits.push(leading_agreement_bits);
171                    }
172
173                    if encoded_post_bundle_feedbacks.len() >= config::REDUNDANT_SERVERS_PER_POST {
174                        break;
175                    }
176                };
177
178                if let Err(e) = result {
179                    warn!("Error retrieving PostBundleFeedback from peer {}: {}", peer, e);
180                    peer_iter.remove_peer(&peer);
181                }
182            }
183
184            let result = peer_tracker.flush().await;
185            if let Err(e) = result {
186                warn!("Error flushing peer tracker: {}", e);
187            }
188        }
189
190        trace!("Discovered {} post bundle feedbacks after visiting {} peers", encoded_post_bundle_feedbacks.len(), peers_visited.len());
191
192        // Store our new cache radius - the peer "furthest out" from our bundle's location_id
193        if let Some(new_radius) = positive_responder_leading_agreement_bits.iter().copied().min() {
194            self.post_bundle_feedback_cache_radius_tracker.update(post_bundle_location_id, new_radius, time_millis);
195        }
196
197        // Merge all collected bundles, taking the best-pow entry per (post_id, feedback_type).
198        let best_encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1::merge(&encoded_post_bundle_feedbacks)
199            .ok_or_else(|| anyhow::anyhow!("No post bundle feedbacks discovered for {}", post_bundle_location_id))?;
200
201        // Cache it
202        trace!("Caching PostBundleFeedback from post_bundle_location_id={} with length {}", post_bundle_location_id, best_encoded_post_bundle_feedback.feedbacks_bytes.len());
203        let encoded_post_bundle_feedback_bytes = best_encoded_post_bundle_feedback.to_bytes()?;
204        self.client_storage.put(BUCKET_POST_BUNDLE_FEEDBACK, &post_bundle_location_id.to_hex_str(), encoded_post_bundle_feedback_bytes.to_vec(), time_millis).await?;
205
206        // Kick off healing
207        post_bundle_feedback_healing::heal_post_bundle_feedbacks(
208            self.runtime_services.clone(),
209            self.sponsor_id,
210            &peers_visited,
211            &encoded_post_bundle_feedbacks,
212            &best_encoded_post_bundle_feedback,
213        );
214
215        // Kick off remote cache populating
216        post_bundle_cache_uploader::upload_post_bundle_feedback_caches(
217            self.runtime_services.clone(),
218            self.sponsor_id,
219            cache_request_tokens,
220            encoded_post_bundle_feedback_bytes,
221        );
222
223        Ok(best_encoded_post_bundle_feedback)
224    }
225}