Skip to main content

hashiverse_lib/client/post_bundle/
posting.rs

1//! # Submit-post and submit-feedback client-side orchestration
2//!
3//! Two async functions that the higher-level client API calls when a user posts or
4//! reacts:
5//!
6//! - `post_to_location` — two-phase commit for a new post against the server(s)
7//!   responsible for the target bucket. `SubmitPostClaimV1` first reserves a slot and
8//!   returns a commit token (the server has now done its admission-control work);
9//!   `SubmitPostCommitV1` then uploads the full post bytes. Split into two so a big
10//!   upload can't be aborted halfway without the server having first agreed it wants
11//!   the post.
12//! - `post_feedback_to_location` — smaller single-phase RPC (`SubmitPostFeedbackV1`)
13//!   used for reactions / flags which carry their own PoW but no big payload to
14//!   reserve against.
15
16use crate::anyhow_assert_eq;
17use crate::client::peer_tracker::peer_tracker::PeerTracker;
18use crate::protocol::posting::encoded_post::{EncodedPostBytesV1, EncodedPostV1};
19use crate::protocol::payload::payload::{PayloadRequestKind, PayloadResponseKind, SubmitPostClaimResponseV1, SubmitPostClaimV1, SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1};
20use crate::protocol::rpc;
21use crate::tools::buckets::BucketLocation;
22use crate::tools::{config, json};
23use crate::tools::runtime_services::RuntimeServices;
24use log::{info, warn};
25use std::sync::Arc;
26use tokio::sync::RwLock;
27use crate::protocol::posting::amplification;
28use crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
29use crate::tools::types::Id;
30
31pub async fn post_to_location(
32    runtime_services: &RuntimeServices,
33    sponsor_id: &Id,
34    peer_tracker: &Arc<RwLock<PeerTracker>>,
35    bucket_location: &BucketLocation,
36    encoded_post: &EncodedPostV1,
37    encoded_post_bytes: &EncodedPostBytesV1,
38    referenced_post_header_bytes: Option<&[u8]>,
39    referenced_hashtags: &[String],
40) -> anyhow::Result<Vec<SubmitPostCommitTokenV1>> {
41    let mut peers_visited = Vec::new();
42    let mut submit_post_claim_tokens = Vec::new();
43    let mut submit_post_commit_tokens = Vec::new();
44
45    let mut peer_tracker = peer_tracker.write().await;
46    let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
47    while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
48        let result: anyhow::Result<()> = try {
49            info!("Requesting SubmitPostClaim with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
50
51            // First we need to get permission to post
52            let request = SubmitPostClaimV1::new_to_bytes(bucket_location, referenced_post_header_bytes, referenced_hashtags, encoded_post_bytes.bytes_without_body())?;
53            let requisite_pow = amplification::get_minimum_post_pow(encoded_post.header.post_length, encoded_post.header.linked_base_ids.len(), bucket_location.duration);
54            let response = rpc::rpc::rpc_server_known_with_requisite_pow_and_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostClaimV1, request, requisite_pow).await?;
55            anyhow_assert_eq!(&PayloadResponseKind::SubmitPostClaimResponseV1, &response.response_request_kind);
56            let response = json::bytes_to_struct::<SubmitPostClaimResponseV1>(&response.bytes)?;
57
58            peers_visited.push(peer.clone());
59
60            peer_iter.add_peers(response.peers_nearer);
61
62            if let Some(submit_post_claim_token) = response.submit_post_claim_token {
63                info!("Received a SubmitPostClaim from peer {}", peer);
64
65                submit_post_claim_tokens.push(submit_post_claim_token.clone());
66
67                // We have permission, so do the post!
68                info!("Posting to peer {}", peer);
69                let request = SubmitPostCommitV1::new_to_bytes(bucket_location, &submit_post_claim_token, encoded_post_bytes.bytes())?;
70                let response = rpc::rpc::rpc_server_known_with_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostCommitV1, request).await?;
71                anyhow_assert_eq!(&PayloadResponseKind::SubmitPostCommitResponseV1, &response.response_request_kind);
72                let response = json::bytes_to_struct::<SubmitPostCommitResponseV1>(&response.bytes)?;
73
74                submit_post_commit_tokens.push(response.submit_post_commit_token);
75                if submit_post_commit_tokens.len() >= config::REDUNDANT_SERVERS_PER_POST {
76                    break;
77                }
78            }
79        };
80
81        if let Err(e) = result {
82            warn!("Error retrieving SubmitPostClaim from peer {}: {}", peer, e);
83            peer_iter.remove_peer(&peer);
84        }
85    }
86
87    if submit_post_claim_tokens.is_empty() {
88        anyhow::bail!("Despite expecting availability, we were unable claim anywhere at {}", bucket_location);
89    }
90
91    if submit_post_commit_tokens.is_empty() {
92        anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location);
93    }
94
95    Ok(submit_post_commit_tokens)
96}
97
98pub async fn post_feedback_to_location(
99    runtime_services: &RuntimeServices,
100    sponsor_id: &Id,
101    peer_tracker: &Arc<RwLock<PeerTracker>>,
102    bucket_location: &BucketLocation,
103    encoded_post_feedback: &EncodedPostFeedbackV1,
104) -> anyhow::Result<()> {
105
106    let mut peers_visited = Vec::new();
107    let mut submit_post_feedback_accepts = Vec::new();
108
109    let mut peer_tracker = peer_tracker.write().await;
110    let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
111    while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
112        let result: anyhow::Result<()> = try {
113            info!("Requesting SubmitPostFeedbackV1 with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
114
115            let request = SubmitPostFeedbackV1::new_to_bytes(bucket_location, encoded_post_feedback)?;
116            let response = rpc::rpc::rpc_server_known_with_requisite_pow(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostFeedbackV1, request, config::POW_MINIMUM_PER_FEEDBACK).await?;
117            anyhow_assert_eq!(&PayloadResponseKind::SubmitPostFeedbackResponseV1, &response.response_request_kind);
118            let response = json::bytes_to_struct::<SubmitPostFeedbackResponseV1>(&response.bytes)?;
119
120            peers_visited.push(peer.clone());
121
122            peer_iter.add_peers(response.peers_nearer);
123
124            if response.accepted {
125                info!("Received an accept from peer {}", peer);
126
127                submit_post_feedback_accepts.push(peer.clone());
128                if submit_post_feedback_accepts.len() >= config::REDUNDANT_SERVERS_PER_POST {
129                    break;
130                }
131            }
132        };
133
134        if let Err(e) = result {
135            warn!("Error retrieving SubmitPostFeedbackV1 from peer {}: {}", peer, e);
136            peer_iter.remove_peer(&peer);
137        }
138    }
139
140    if submit_post_feedback_accepts.is_empty() {
141        anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location.location_id);
142    }
143
144    Ok(())
145}