hashiverse_lib/client/caching/
post_bundle_cache_uploader.rs1use std::sync::Arc;
13use bytes::Bytes;
14use crate::anyhow_assert_eq;
15use crate::protocol::payload::payload::{
16 CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1,
17 CachePostBundleV1, CacheRequestTokenV1, PayloadRequestKind, PayloadResponseKind,
18};
19use crate::protocol::rpc;
20use crate::tools::runtime_services::RuntimeServices;
21use crate::tools::tools::spawn_background_task;
22use crate::tools::types::Id;
23use crate::tools::json;
24use log::warn;
25
26pub fn upload_post_bundle_caches(
29 runtime_services: Arc<RuntimeServices>,
30 sponsor_id: Id,
31 tokens: Vec<CacheRequestTokenV1>,
32 bundles: Vec<(Id, Bytes)>,
33) {
34 if tokens.is_empty() || bundles.is_empty() {
35 return;
36 }
37
38 spawn_background_task(async move {
39 for token in &tokens {
40 let now = runtime_services.time_provider.current_time_millis();
41 if token.is_expired(now) {
42 continue;
43 }
44
45 let filtered: Vec<&[u8]> = bundles
46 .iter()
47 .filter(|(originator_id, _)| !token.already_cached_peer_ids.contains(originator_id))
48 .map(|(_, bytes)| bytes.as_ref())
49 .collect();
50
51 if filtered.is_empty() {
52 continue;
53 }
54
55 let result: anyhow::Result<()> = async {
56 let request_bytes = CachePostBundleV1::new_to_bytes(token, &filtered)?;
57 let response = rpc::rpc::rpc_server_known(
58 &runtime_services,
59 &sponsor_id,
60 &token.peer,
61 PayloadRequestKind::CachePostBundleV1,
62 request_bytes,
63 )
64 .await?;
65 anyhow_assert_eq!(&PayloadResponseKind::CachePostBundleResponseV1, &response.response_request_kind);
66 json::bytes_to_struct::<CachePostBundleResponseV1>(&response.bytes)?;
67 Ok(())
68 }
69 .await;
70
71 if let Err(e) = result {
72 warn!("Failed to upload post bundle cache to {}: {}", token.peer, e);
73 }
74 }
75 });
76}
77
78pub fn upload_post_bundle_feedback_caches(
80 runtime_services: Arc<RuntimeServices>,
81 sponsor_id: Id,
82 tokens: Vec<CacheRequestTokenV1>,
83 feedback_bytes: Bytes,
84) {
85 if tokens.is_empty() || feedback_bytes.is_empty() {
86 return;
87 }
88
89 spawn_background_task(async move {
90 for token in &tokens {
91 let now = runtime_services.time_provider.current_time_millis();
92 if token.is_expired(now) {
93 continue;
94 }
95
96 {
97 let result: anyhow::Result<()> = async {
98 let request_bytes = CachePostBundleFeedbackV1::new_to_bytes(token, &feedback_bytes)?;
99 let response = rpc::rpc::rpc_server_known(
100 &runtime_services,
101 &sponsor_id,
102 &token.peer,
103 PayloadRequestKind::CachePostBundleFeedbackV1,
104 request_bytes,
105 )
106 .await?;
107 anyhow_assert_eq!(&PayloadResponseKind::CachePostBundleFeedbackResponseV1, &response.response_request_kind);
108 json::bytes_to_struct::<CachePostBundleFeedbackResponseV1>(&response.bytes)?;
109 Ok(())
110 }
111 .await;
112
113 if let Err(e) = result {
114 warn!("Failed to upload post bundle feedback cache to {}: {}", token.peer, e);
115 }
116 }
117 }
118 });
119}