hashiverse_lib/client/post_bundle/
posting.rs1use crate::anyhow_assert_eq;
17use crate::client::peer_tracker::peer_tracker::PeerTracker;
18use crate::protocol::posting::encoded_post::{EncodedPostBytesV1, EncodedPostV1};
19use crate::protocol::payload::payload::{PayloadRequestKind, PayloadResponseKind, SubmitPostClaimResponseV1, SubmitPostClaimV1, SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1};
20use crate::protocol::rpc;
21use crate::tools::buckets::BucketLocation;
22use crate::tools::{config, json};
23use crate::tools::runtime_services::RuntimeServices;
24use log::{info, warn};
25use std::sync::Arc;
26use tokio::sync::RwLock;
27use crate::protocol::posting::amplification;
28use crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
29use crate::tools::types::Id;
30
31pub async fn post_to_location(
32 runtime_services: &RuntimeServices,
33 sponsor_id: &Id,
34 peer_tracker: &Arc<RwLock<PeerTracker>>,
35 bucket_location: &BucketLocation,
36 encoded_post: &EncodedPostV1,
37 encoded_post_bytes: &EncodedPostBytesV1,
38 referenced_post_header_bytes: Option<&[u8]>,
39 referenced_hashtags: &[String],
40) -> anyhow::Result<Vec<SubmitPostCommitTokenV1>> {
41 let mut peers_visited = Vec::new();
42 let mut submit_post_claim_tokens = Vec::new();
43 let mut submit_post_commit_tokens = Vec::new();
44
45 let mut peer_tracker = peer_tracker.write().await;
46 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
47 while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
48 let result: anyhow::Result<()> = try {
49 info!("Requesting SubmitPostClaim with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
50
51 let request = SubmitPostClaimV1::new_to_bytes(bucket_location, referenced_post_header_bytes, referenced_hashtags, encoded_post_bytes.bytes_without_body())?;
53 let requisite_pow = amplification::get_minimum_post_pow(encoded_post.header.post_length, encoded_post.header.linked_base_ids.len(), bucket_location.duration);
54 let response = rpc::rpc::rpc_server_known_with_requisite_pow_and_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostClaimV1, request, requisite_pow).await?;
55 anyhow_assert_eq!(&PayloadResponseKind::SubmitPostClaimResponseV1, &response.response_request_kind);
56 let response = json::bytes_to_struct::<SubmitPostClaimResponseV1>(&response.bytes)?;
57
58 peers_visited.push(peer.clone());
59
60 peer_iter.add_peers(response.peers_nearer);
61
62 if let Some(submit_post_claim_token) = response.submit_post_claim_token {
63 info!("Received a SubmitPostClaim from peer {}", peer);
64
65 submit_post_claim_tokens.push(submit_post_claim_token.clone());
66
67 info!("Posting to peer {}", peer);
69 let request = SubmitPostCommitV1::new_to_bytes(bucket_location, &submit_post_claim_token, encoded_post_bytes.bytes())?;
70 let response = rpc::rpc::rpc_server_known_with_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostCommitV1, request).await?;
71 anyhow_assert_eq!(&PayloadResponseKind::SubmitPostCommitResponseV1, &response.response_request_kind);
72 let response = json::bytes_to_struct::<SubmitPostCommitResponseV1>(&response.bytes)?;
73
74 submit_post_commit_tokens.push(response.submit_post_commit_token);
75 if submit_post_commit_tokens.len() >= config::REDUNDANT_SERVERS_PER_POST {
76 break;
77 }
78 }
79 };
80
81 if let Err(e) = result {
82 warn!("Error retrieving SubmitPostClaim from peer {}: {}", peer, e);
83 peer_iter.remove_peer(&peer);
84 }
85 }
86
87 if submit_post_claim_tokens.is_empty() {
88 anyhow::bail!("Despite expecting availability, we were unable claim anywhere at {}", bucket_location);
89 }
90
91 if submit_post_commit_tokens.is_empty() {
92 anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location);
93 }
94
95 Ok(submit_post_commit_tokens)
96}
97
98pub async fn post_feedback_to_location(
99 runtime_services: &RuntimeServices,
100 sponsor_id: &Id,
101 peer_tracker: &Arc<RwLock<PeerTracker>>,
102 bucket_location: &BucketLocation,
103 encoded_post_feedback: &EncodedPostFeedbackV1,
104) -> anyhow::Result<()> {
105
106 let mut peers_visited = Vec::new();
107 let mut submit_post_feedback_accepts = Vec::new();
108
109 let mut peer_tracker = peer_tracker.write().await;
110 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
111 while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
112 let result: anyhow::Result<()> = try {
113 info!("Requesting SubmitPostFeedbackV1 with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
114
115 let request = SubmitPostFeedbackV1::new_to_bytes(bucket_location, encoded_post_feedback)?;
116 let response = rpc::rpc::rpc_server_known_with_requisite_pow(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostFeedbackV1, request, config::POW_MINIMUM_PER_FEEDBACK).await?;
117 anyhow_assert_eq!(&PayloadResponseKind::SubmitPostFeedbackResponseV1, &response.response_request_kind);
118 let response = json::bytes_to_struct::<SubmitPostFeedbackResponseV1>(&response.bytes)?;
119
120 peers_visited.push(peer.clone());
121
122 peer_iter.add_peers(response.peers_nearer);
123
124 if response.accepted {
125 info!("Received an accept from peer {}", peer);
126
127 submit_post_feedback_accepts.push(peer.clone());
128 if submit_post_feedback_accepts.len() >= config::REDUNDANT_SERVERS_PER_POST {
129 break;
130 }
131 }
132 };
133
134 if let Err(e) = result {
135 warn!("Error retrieving SubmitPostFeedbackV1 from peer {}: {}", peer, e);
136 peer_iter.remove_peer(&peer);
137 }
138 }
139
140 if submit_post_feedback_accepts.is_empty() {
141 anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location.location_id);
142 }
143
144 Ok(())
145}