Skip to main content

hashiverse_lib/client/caching/
post_bundle_cache_uploader.rs

1//! # Fire-and-forget cache replication
2//!
3//! After a successful post or feedback submission, the server returns a set of
4//! `CachePostBundle[Feedback]Token` values that authorise the client to forward the
5//! freshly-written bundle to *other* servers for caching. This module spawns those
6//! forwards as background tasks so the original submission can return to the user
7//! without blocking on network propagation.
8//!
9//! Bundles that already carry the remote server as an originator are filtered out —
10//! there's no point re-uploading data to a server we already know holds it.
11
12use std::sync::Arc;
13use bytes::Bytes;
14use crate::anyhow_assert_eq;
15use crate::protocol::payload::payload::{
16    CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1,
17    CachePostBundleV1, CacheRequestTokenV1, PayloadRequestKind, PayloadResponseKind,
18};
19use crate::protocol::rpc;
20use crate::tools::runtime_services::RuntimeServices;
21use crate::tools::tools::spawn_background_task;
22use crate::tools::types::Id;
23use crate::tools::json;
24use log::warn;
25
26/// Fire-and-forget: for each token, uploads all collected bundles whose originator is not
27/// already cached at the token's server.
28pub fn upload_post_bundle_caches(
29    runtime_services: Arc<RuntimeServices>,
30    sponsor_id: Id,
31    tokens: Vec<CacheRequestTokenV1>,
32    bundles: Vec<(Id, Bytes)>,
33) {
34    if tokens.is_empty() || bundles.is_empty() {
35        return;
36    }
37
38    spawn_background_task(async move {
39        for token in &tokens {
40            let now = runtime_services.time_provider.current_time_millis();
41            if token.is_expired(now) {
42                continue;
43            }
44
45            let filtered: Vec<&[u8]> = bundles
46                .iter()
47                .filter(|(originator_id, _)| !token.already_cached_peer_ids.contains(originator_id))
48                .map(|(_, bytes)| bytes.as_ref())
49                .collect();
50
51            if filtered.is_empty() {
52                continue;
53            }
54
55            let result: anyhow::Result<()> = async {
56                let request_bytes = CachePostBundleV1::new_to_bytes(token, &filtered)?;
57                let response = rpc::rpc::rpc_server_known(
58                    &runtime_services,
59                    &sponsor_id,
60                    &token.peer,
61                    PayloadRequestKind::CachePostBundleV1,
62                    request_bytes,
63                )
64                .await?;
65                anyhow_assert_eq!(&PayloadResponseKind::CachePostBundleResponseV1, &response.response_request_kind);
66                json::bytes_to_struct::<CachePostBundleResponseV1>(&response.bytes)?;
67                Ok(())
68            }
69            .await;
70
71            if let Err(e) = result {
72                warn!("Failed to upload post bundle cache to {}: {}", token.peer, e);
73            }
74        }
75    });
76}
77
78/// Fire-and-forget: uploads the single combined-best feedback to every token's server.
79pub fn upload_post_bundle_feedback_caches(
80    runtime_services: Arc<RuntimeServices>,
81    sponsor_id: Id,
82    tokens: Vec<CacheRequestTokenV1>,
83    feedback_bytes: Bytes,
84) {
85    if tokens.is_empty() || feedback_bytes.is_empty() {
86        return;
87    }
88
89    spawn_background_task(async move {
90        for token in &tokens {
91            let now = runtime_services.time_provider.current_time_millis();
92            if token.is_expired(now) {
93                continue;
94            }
95
96            {
97                let result: anyhow::Result<()> = async {
98                    let request_bytes = CachePostBundleFeedbackV1::new_to_bytes(token, &feedback_bytes)?;
99                    let response = rpc::rpc::rpc_server_known(
100                        &runtime_services,
101                        &sponsor_id,
102                        &token.peer,
103                        PayloadRequestKind::CachePostBundleFeedbackV1,
104                        request_bytes,
105                    )
106                    .await?;
107                    anyhow_assert_eq!(&PayloadResponseKind::CachePostBundleFeedbackResponseV1, &response.response_request_kind);
108                    json::bytes_to_struct::<CachePostBundleFeedbackResponseV1>(&response.bytes)?;
109                    Ok(())
110                }
111                .await;
112
113                if let Err(e) = result {
114                    warn!("Failed to upload post bundle feedback cache to {}: {}", token.peer, e);
115                }
116            }
117        }
118    });
119}