1use crate::environment::environment::PostBundleMetadata;
24use crate::server::hashiverse_server::HashiverseServer;
25use crate::tools::tools::is_ssrf_protected_ip;
26use bytes::{Bytes, BytesMut};
27use hashiverse_lib::anyhow_assert_eq;
28use hashiverse_lib::protocol::payload::payload::{
29 AnnounceResponseV1, AnnounceV1, BootstrapResponseV1, CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1, CachePostBundleV1, ErrorResponseV1, FetchUrlPreviewResponseV1, FetchUrlPreviewV1,
30 GetPostBundleFeedbackResponseV1, GetPostBundleFeedbackV1, GetPostBundleResponseV1, GetPostBundleV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1,
31 HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind, PingResponseV1, SubmitPostClaimResponseV1, SubmitPostClaimTokenV1, SubmitPostClaimV1, SubmitPostCommitResponseV1, SubmitPostCommitTokenV1,
32 SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1, TrendingHashtagV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1,
33};
34use hashiverse_lib::protocol::peer::PeerPow;
35use hashiverse_lib::protocol::posting::amplification::get_minimum_post_pow;
36use hashiverse_lib::protocol::posting::encoded_post::EncodedPostV1;
37use hashiverse_lib::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
38use hashiverse_lib::protocol::posting::encoded_post_bundle_feedback::{EncodedPostBundleFeedbackHeaderV1, EncodedPostBundleFeedbackV1};
39use hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx;
40use hashiverse_lib::protocol::rpc::rpc_response::{RpcResponsePacketTx, RpcResponsePacketTxFlags};
41use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
42use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_SECOND};
43use hashiverse_lib::tools::hyper_log_log::HyperLogLog;
44use hashiverse_lib::tools::types::{Id, Signature};
45use hashiverse_lib::tools::{hashing, url_preview};
46use hashiverse_lib::tools::{config, json, BytesGatherer};
47use hashiverse_lib::transport::transport::IncomingRequest;
48use log::{info, trace, warn};
49use std::collections::HashSet;
50use tokio::sync::mpsc;
51use tokio_util::sync::CancellationToken;
52
53const TRENDING_HASHTAGS_FALLBACK: &[&str] = &["hashiverse", "news"];
59
60fn normalise_hashtag(hashtag: &str) -> String {
63 let lowercased = hashtag.to_lowercase();
64 match lowercased.strip_prefix('#') {
65 Some(stripped) => stripped.to_string(),
66 None => lowercased,
67 }
68}
69
70fn top_up_trending_hashtags_with_fallback(trending_hashtags: &mut Vec<TrendingHashtagV1>, limit: u16, fallback_hashtags: &[&str]) {
75 let target_length = limit as usize;
76 if trending_hashtags.len() >= target_length {
77 return;
78 }
79
80 let mut existing_normalised_hashtags: HashSet<String> = trending_hashtags.iter()
81 .map(|entry| normalise_hashtag(&entry.hashtag))
82 .collect();
83
84 for fallback_hashtag in fallback_hashtags {
85 if trending_hashtags.len() >= target_length {
86 break;
87 }
88 let normalised_fallback_hashtag = normalise_hashtag(fallback_hashtag);
89 if existing_normalised_hashtags.contains(&normalised_fallback_hashtag) {
90 continue;
91 }
92 trending_hashtags.push(TrendingHashtagV1 {
93 hashtag: (*fallback_hashtag).to_string(),
94 count: 0,
95 });
96 existing_normalised_hashtags.insert(normalised_fallback_hashtag);
97 }
98}
99
100impl HashiverseServer {
101 pub async fn wrap_and_dispatch_network_envelopes(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> Result<(), anyhow::Error> {
102 loop {
103 tokio::select! {
104 _ = cancellation_token.cancelled() => { break },
105
106 receipt = rx.recv() => {
107 match receipt {
108 Some(incoming) => {
109 let result = self.wrap_and_dispatch_network_envelope(cancellation_token.clone(), &incoming).await;
111 match result {
112 Ok(bytes) => {
113 let result = incoming.reply.send(bytes);
114 if result.is_err() { warn!("failed to send reply"); }
115 },
116 Err(e) => {
117 warn!("failed to process packet from {}: {}", incoming.caller_address, e);
118 incoming.report_bad_request();
119 drop(incoming.reply);
120 },
121 }
122 },
123 None => {
124 warn!("channel closed");
125 break;
126 }
127 }
128 }
129 }
130 }
131
132 Ok(())
133 }
134
135 async fn wrap_and_dispatch_network_envelope(&self, cancellation_token: CancellationToken, incoming: &IncomingRequest) -> anyhow::Result<BytesGatherer> {
136 let caller_address = incoming.caller_address.as_str();
137 let current_time_millis = self.runtime_services.time_provider.current_time_millis();
138
139 let rpc_request_packet_rx = RpcRequestPacketRx::decode(¤t_time_millis, &self.server_id.keys.verification_key_bytes, &self.server_id.keys.pq_commitment_bytes, incoming.bytes.clone())?;
141 {
145 if self.seen_salts.contains_key(&rpc_request_packet_rx.pow_salt) {
146 anyhow::bail!("replay detected: salt already seen");
147 }
148 self.seen_salts.insert(rpc_request_packet_rx.pow_salt, ());
149 }
150
151 let pow_content_hash = rpc_request_packet_rx.pow_content_hash;
153
154 let dispatch_result: anyhow::Result<BytesGatherer> = try {
155 let pow = match rpc_request_packet_rx.pow_server_known {
157 true => {
158 let (pow, improved_pow_current_day, improved_pow_current_month) = {
159 let peer_self = self.peer_self.read(); let pow = PeerPow::new(
161 rpc_request_packet_rx.pow_sponsor_id,
162 &peer_self.verification_key_bytes,
163 &peer_self.pq_commitment_bytes,
164 rpc_request_packet_rx.pow_timestamp,
165 rpc_request_packet_rx.pow_content_hash,
166 rpc_request_packet_rx.pow_salt,
167 )?;
168
169 let improved_pow_current_day = pow.pow_decayed_day(current_time_millis) > peer_self.pow_current_day.pow_decayed_day(current_time_millis);
170 let improved_pow_current_month = pow.pow_decayed_month(current_time_millis) > peer_self.pow_current_month.pow_decayed_month(current_time_millis);
171
172 (pow, improved_pow_current_day, improved_pow_current_month)
173 };
174
175 if improved_pow_current_day || improved_pow_current_month {
177 let mut peer_self = self.peer_self.write(); if improved_pow_current_day {
179 trace!("pow_current_day upgraded {} -> {}", peer_self.pow_current_day, pow);
180 peer_self.pow_current_day = pow.clone();
181 }
182 if improved_pow_current_month {
183 trace!("pow_current_month upgraded {} -> {}", peer_self.pow_current_month, pow);
184 peer_self.pow_current_month = pow.clone();
185 }
186
187 peer_self.sign(self.runtime_services.time_provider.as_ref(), &self.server_id.keys.signature_key)?;
188 }
189
190 Some(pow)
191 }
192
193 false => {
194 match rpc_request_packet_rx.payload_request_kind {
196 PayloadRequestKind::BootstrapV1 => {}
197 _ => anyhow::bail!("Anonymous pow not allowed for {}", rpc_request_packet_rx.payload_request_kind),
198 }
199
200 None
201 }
202 };
203
204 let (compress_response, payload_response_kind, payload) = self.dispatch_network_envelope(cancellation_token, pow, rpc_request_packet_rx).await?;
206 let response_flags = match compress_response {
207 true => RpcResponsePacketTxFlags::COMPRESSED,
208 false => RpcResponsePacketTxFlags::empty(),
209 };
210
211 RpcResponsePacketTx::encode(
213 &self.server_id.keys.signature_key,
214 &self.server_id.keys.verification_key_bytes,
215 &self.server_id.keys.pq_commitment_bytes,
216 &self.server_id.sponsor_id,
217 &self.server_id.timestamp,
218 &self.server_id.hash,
219 &self.server_id.salt,
220 &pow_content_hash,
221 response_flags,
222 payload_response_kind,
223 payload,
224 )?
225 };
226
227 match dispatch_result {
228 Ok(results) => Ok(results),
229 Err(e) => {
230 warn!("failed to dispatch packet from {}: {}", caller_address, e);
231 incoming.report_bad_request();
232
233 let payload_response_kind = PayloadResponseKind::ErrorResponseV1;
234 let response = ErrorResponseV1 { code: 0, message: e.to_string() };
235 let payload = BytesGatherer::from_bytes(json::struct_to_bytes(&response)?);
236
237 RpcResponsePacketTx::encode(
239 &self.server_id.keys.signature_key,
240 &self.server_id.keys.verification_key_bytes,
241 &self.server_id.keys.pq_commitment_bytes,
242 &self.server_id.sponsor_id,
243 &self.server_id.timestamp,
244 &self.server_id.hash,
245 &self.server_id.salt,
246 &pow_content_hash,
247 RpcResponsePacketTxFlags::COMPRESSED,
248 payload_response_kind,
249 payload,
250 )
251 }
252 }
253 }
254
255 async fn dispatch_network_envelope(&self, cancellation_token: CancellationToken, pow: Option<PeerPow>, rpc_request_packet_rx: RpcRequestPacketRx) -> anyhow::Result<(bool, PayloadResponseKind, BytesGatherer)> {
256 let compress_response = match rpc_request_packet_rx.payload_request_kind {
258 PayloadRequestKind::GetPostBundleV1 => false, PayloadRequestKind::CachePostBundleV1 => false, _ => true,
261 };
262
263 let (payload_response_kind, payload) = match rpc_request_packet_rx.payload_request_kind {
264 PayloadRequestKind::ErrorV1 => {
265 anyhow::bail!("Received ErrorV1");
266 }
267 PayloadRequestKind::PingV1 => self.dispatch_network_payload_x_PingV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
268 PayloadRequestKind::BootstrapV1 => self.dispatch_network_payload_x_BootstrapV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
269 PayloadRequestKind::AnnounceV1 => self.dispatch_network_payload_x_AnnounceV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
270 PayloadRequestKind::GetPostBundleV1 => self.dispatch_network_payload_x_GetPostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
271 PayloadRequestKind::GetPostBundleFeedbackV1 => { self.dispatch_network_payload_x_GetPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
272 PayloadRequestKind::SubmitPostClaimV1 => { self.dispatch_network_payload_x_SubmitPostClaimV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
273 PayloadRequestKind::SubmitPostCommitV1 => { self.dispatch_network_payload_x_SubmitPostCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
274 PayloadRequestKind::SubmitPostFeedbackV1 => { self.dispatch_network_payload_x_SubmitPostFeedbackV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
275 PayloadRequestKind::HealPostBundleClaimV1 => { self.dispatch_network_payload_x_HealPostBundleClaimV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
276 PayloadRequestKind::HealPostBundleCommitV1 => { self.dispatch_network_payload_x_HealPostBundleCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
277 PayloadRequestKind::HealPostBundleFeedbackV1 => { self.dispatch_network_payload_x_HealPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
278 PayloadRequestKind::CachePostBundleV1 => self.dispatch_network_payload_x_CachePostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
279 PayloadRequestKind::CachePostBundleFeedbackV1 => { self.dispatch_network_payload_x_CachePostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
280 PayloadRequestKind::FetchUrlPreviewV1 => self.dispatch_network_payload_x_FetchUrlPreviewV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
281 PayloadRequestKind::TrendingHashtagsFetchV1 => self.dispatch_network_payload_x_TrendingHashtagsFetchV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
282 };
283
284 Ok((compress_response, payload_response_kind, payload))
285 }
286
287 #[allow(non_snake_case)]
288 async fn dispatch_network_payload_x_PingV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
289 anyhow_assert_eq!(&PayloadRequestKind::PingV1, &payload_request_kind);
290 let peer = self.peer_self.read().clone();
291 let json = json::struct_to_bytes(&PingResponseV1 { peer })?;
292 Ok((PayloadResponseKind::PingResponseV1, BytesGatherer::from_bytes(json)))
293 }
294
295 #[allow(non_snake_case)]
296 async fn dispatch_network_payload_x_BootstrapV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
297 anyhow_assert_eq!(&PayloadRequestKind::BootstrapV1, &payload_request_kind);
298 let peers_random = self.kademlia.read().get_peers_random(config::BOOTSTRAP_V1_NUM_PEERS);
299 let json = json::struct_to_bytes(&BootstrapResponseV1 { peers_random })?;
300 Ok((PayloadResponseKind::BootstrapResponseV1, BytesGatherer::from_bytes(json)))
301 }
302
303 #[allow(non_snake_case)]
304 async fn dispatch_network_payload_x_AnnounceV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
305 anyhow_assert_eq!(&PayloadRequestKind::AnnounceV1, &payload_request_kind);
306
307 let request = json::bytes_to_struct::<AnnounceV1>(&bytes)?;
308 let peer = request.peer_self;
311 let peer_id = peer.id;
312
313 self.add_potential_peer_to_kademlia(peer, self.runtime_services.time_provider.as_ref().current_time_millis()).await;
315
316 let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
317
318 let json = json::struct_to_bytes(&AnnounceResponseV1 {
319 peer_self: self.peer_self.read().clone(),
320 peers_nearest,
321 })?;
322 Ok((PayloadResponseKind::AnnounceResponseV1, BytesGatherer::from_bytes(json)))
323 }
324
325 #[allow(non_snake_case)]
326 async fn dispatch_network_payload_x_GetPostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
327 anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleV1, &payload_request_kind);
328
329 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
330
331 let request = json::bytes_to_struct::<GetPostBundleV1>(&bytes)?;
332 trace!("received GetPostBundleV1: bucket_location={}", request.bucket_location);
333
334 request.bucket_location.validate()?;
336
337 {
339 for peer in request.peers_visited {
340 self.add_potential_peer_to_kademlia(peer, time_millis).await;
341 }
342 }
343
344 let peer_self = self.peer_self.read().clone();
345
346 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
348 if !among_peers_nearer {
349 warn!("I am not in peers_nearer {}", peer_self);
350 }
351
352 let post_bundle = match among_peers_nearer {
353 true => {
354 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
357
358 let mut encoded_post_bundle_bytes: Option<Bytes> = None;
359
360 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
362 if let Some(mut post_bundle_metadata) = post_bundle_metadata {
363 encoded_post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
364
365 if !post_bundle_metadata.sealed {
367 let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
368 if sealed {
369 if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
371 let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
372 encoded_post_bundle.header.time_millis = time_millis;
373 encoded_post_bundle.header.sealed = true;
374 encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
375 let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
376 self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &encoded_post_bundle_bytes_new)?;
377 encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
378 }
379
380 post_bundle_metadata.sealed = true;
382 self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
383 }
384 else {
385 if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
387 let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
388 encoded_post_bundle.header.time_millis = time_millis;
389 encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
390 let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
391 encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
392 }
393 }
394 }
395 };
396
397 if encoded_post_bundle_bytes.is_none() {
400 let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
401
402 let mut header = EncodedPostBundleHeaderV1 {
403 time_millis,
404 location_id: request.bucket_location.location_id,
405 overflowed: false,
406 sealed,
407 num_posts: 0,
408 encoded_post_ids: vec![],
409 encoded_post_lengths: vec![],
410 encoded_post_healed: HashSet::new(),
411 peer: peer_self.clone(),
412 signature: Signature::zero(),
413 };
414 header.signature_generate(&self.server_id.keys.signature_key)?;
415
416 let encoded_post_bundle = EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() };
417 encoded_post_bundle_bytes = Some(encoded_post_bundle.to_bytes()?);
418 }
419
420 encoded_post_bundle_bytes
421 }
422 false => None,
423 };
424
425 let cache_result = self.post_bundle_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
426
427 let get_post_bundle_response = GetPostBundleResponseV1 {
428 peers_nearer,
429 cache_request_token: cache_result.cache_request_token,
430 post_bundles_cached: cache_result.cached_items,
431 post_bundle,
432 };
433 Ok((PayloadResponseKind::GetPostBundleResponseV1, get_post_bundle_response.to_bytes_gatherer()?))
434 }
435
436 #[allow(non_snake_case)]
437 async fn dispatch_network_payload_x_GetPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
438 anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleFeedbackV1, &payload_request_kind);
439
440 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
441
442 let request = json::bytes_to_struct::<GetPostBundleFeedbackV1>(&bytes)?;
443 trace!("received GetPostBundleFeedbackV1");
444
445 {
447 for peer in request.peers_visited {
448 self.add_potential_peer_to_kademlia(peer, time_millis).await;
449 }
450 }
451
452 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
454
455 let mut post_bundle_encoded_feedbacks_bytes: Option<Bytes> = None;
456
457 if among_peers_nearer {
458 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
460 if post_bundle_metadata.is_some() {
461 post_bundle_encoded_feedbacks_bytes = Some(self.environment.get_post_bundle_encoded_post_feedbacks_bytes(time_millis, &request.bucket_location.location_id)?);
462 }
463 }
464
465 let peer_self = self.peer_self.read().clone();
467 let encoded_post_bundle_feedback = match post_bundle_encoded_feedbacks_bytes {
468 Some(feedbacks_bytes) => {
469
470 let feedbacks_bytes_hash = hashing::hash(feedbacks_bytes.as_ref());
471
472 let mut header = EncodedPostBundleFeedbackHeaderV1 {
473 time_millis,
474 location_id: request.bucket_location.location_id,
475 feedbacks_bytes_hash,
476 peer: peer_self.clone(),
477 signature: Signature::zero(),
478 };
479 header.signature_generate(&self.server_id.keys.signature_key);
480
481 let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1 {
482 header,
483 feedbacks_bytes,
484 };
485 Some(encoded_post_bundle_feedback.to_bytes()?)
486 }
487 None => None,
488 };
489
490 let cache_result = self.post_bundle_feedback_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
491
492 let get_post_bundle_feedback_response = GetPostBundleFeedbackResponseV1 {
493 peers_nearer,
494 cache_request_token: cache_result.cache_request_token,
495 post_bundle_feedbacks_cached: cache_result.cached_items,
496 encoded_post_bundle_feedback,
497 };
498 Ok((PayloadResponseKind::GetPostBundleFeedbackResponseV1, get_post_bundle_feedback_response.to_bytes_gatherer()?))
499 }
500
501 #[allow(non_snake_case)]
502 async fn dispatch_network_payload_x_SubmitPostClaimV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
503 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostClaimV1, &payload_request_kind);
504
505 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
506
507 let pow = match pow {
508 Some(pow) => pow,
509 None => anyhow::bail!("We need pow for a submit post claim"),
510 };
511
512 let request = SubmitPostClaimV1::from_bytes(&mut bytes)?;
513 trace!("received SubmitPostClaimV1");
514
515 request.bucket_location.validate()?;
517
518 let bucket_duration = {
520 let bucket_duration = BUCKET_DURATIONS.iter().find(|bucket_duration| **bucket_duration == request.bucket_location.duration);
521 match bucket_duration {
522 Some(bucket_duration) => *bucket_duration,
523 None => anyhow::bail!("Unrecognised bucket duration provided"),
524 }
525 };
526
527 let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes, &request.bucket_location.base_id, false, false)?;
528
529 {
531 let pow_minimum = get_minimum_post_pow(decoded_post.header.post_length, decoded_post.header.linked_base_ids.len(), request.bucket_location.duration);
532 if pow.pow < pow_minimum {
533 anyhow::bail!("Insufficient proof of work for this post: actual={} < expected={}", pow.pow, pow_minimum);
534 }
535 }
536
537 {
539 let timestamp = BucketLocation::round_down_to_bucket_start(decoded_post.header.time_millis, bucket_duration);
541 if timestamp != request.bucket_location.bucket_time_millis {
542 anyhow::bail!("The post timestamp does not match the bucket");
543 }
544 }
545
546 let client_id = decoded_post.header.client_id()?;
547
548 if !decoded_post.header.linked_base_ids.contains(&request.bucket_location.base_id) {
550 anyhow::bail!("The base_id is not related to the post");
551 }
552
553 if request.bucket_location.bucket_type == BucketType::User && request.bucket_location.base_id != client_id.id {
555 anyhow::bail!("Only the posting user is allowed to post to a bucket of type USER");
556 }
557
558 if matches!(request.bucket_location.bucket_type, BucketType::ReplyToPost | BucketType::Sequel) {
561 let original_header_bytes = request.referenced_post_header_bytes
562 .ok_or_else(|| anyhow::anyhow!("{:?} posts require the original post's header bytes", request.bucket_location.bucket_type))?;
563
564 let original_post = EncodedPostV1::decode_from_bytes(original_header_bytes, &client_id.id, false, false)?;
567
568 if original_post.post_id != request.bucket_location.base_id {
570 anyhow::bail!("Referenced post header's post_id does not match the bucket's base_id");
571 }
572
573 if request.bucket_location.bucket_type == BucketType::Sequel {
575 let original_client_id = original_post.header.client_id()?;
576 if original_client_id != client_id {
577 anyhow::bail!("Sequel post author does not match original post author");
578 }
579 }
580 }
581
582 {
584 let delta = (time_millis - decoded_post.header.time_millis).abs();
585 if delta > config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD {
586 anyhow::bail!("The post timestamp delta is too large ({} > {})", delta, config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD);
587 }
588 }
589
590 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
592
593 let submit_post_claim_token = match among_peers_nearer {
594 true => {
595 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
597 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
598 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
599
600 if !post_bundle_metadata.sealed {
602 post_bundle_metadata.num_posts_granted += 1;
603 post_bundle_metadata.overflowed = post_bundle_metadata.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
604 post_bundle_metadata.sealed = post_bundle_metadata.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
605
606 self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
607 }
608
609 match post_bundle_metadata.sealed {
611 false => {
612 info!("Granted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
613 Some(SubmitPostClaimTokenV1::new(self.peer_self.read().clone(), request.bucket_location.clone(), decoded_post.post_id, &self.server_id.keys.signature_key))
614 }
615 true => {
616 info!(
617 "Not granting SubmitPostClaimTokenV1 to {} as we have num_posts={} num_posts_granted={}",
618 request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted
619 );
620 None
621 }
622 }
623 }
624
625 false => None,
626 };
627
628 if submit_post_claim_token.is_some() {
630 if request.bucket_location.bucket_type == BucketType::User && !request.referenced_hashtags.is_empty() {
632 let author_verification_key_bytes = &decoded_post.header.verification_key_bytes;
633 for referenced_hashtag in &request.referenced_hashtags {
634 let hashtag_id = match Id::from_hashtag_str(referenced_hashtag) {
635 Ok(id) => id,
636 Err(_) => continue, };
638 if !decoded_post.header.linked_base_ids.contains(&hashtag_id) {
639 continue; }
641 let mut hll = self.trending_hashtags.get(referenced_hashtag).unwrap_or_else(HyperLogLog::new);
642 hll.insert(author_verification_key_bytes.as_ref());
643 self.trending_hashtags.insert(referenced_hashtag.clone(), hll);
644 }
645 }
646 }
647
648 let response = SubmitPostClaimResponseV1 { peers_nearer, submit_post_claim_token };
649 Ok((PayloadResponseKind::SubmitPostClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
650 }
651
652 #[allow(non_snake_case)]
653 async fn dispatch_network_payload_x_SubmitPostCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
654 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostCommitV1, &payload_request_kind);
655
656 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
657
658 let request = SubmitPostCommitV1::from_bytes(&mut bytes)?;
659 trace!("received SubmitPostCommitV1");
660
661 let peer_self = self.peer_self.read(); if request.submit_post_claim_token.peer.id != peer_self.id {
665 anyhow::bail!("The submit_post_claim_token is not from us");
666 }
667
668 request.bucket_location.validate()?;
670 if request.bucket_location != request.submit_post_claim_token.bucket_location {
671 anyhow::bail!("The location_id in the SubmitPostCommit does not match the bucket_location in the SubmitPostClaimToken");
672 }
673
674 let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes.clone(), &request.bucket_location.base_id, true, false)?;
676
677 if decoded_post.post_id != request.submit_post_claim_token.post_id {
679 anyhow::bail!("The post_id of the committed post does not match the post_id in the SubmitPostClaimToken");
680 }
681
682 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
684 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
685 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
686
687 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
689
690 let mut post_bundle = match post_bundle_bytes {
692 Some(bytes) => {
693 let bytes = Bytes::from_owner(bytes);
694 let bundle = EncodedPostBundleV1::from_bytes(bytes, true)?;
695 bundle
696 }
697 None => {
698 let header = EncodedPostBundleHeaderV1 {
699 time_millis: TimeMillis::zero(),
700 location_id: request.bucket_location.location_id,
701 overflowed: false,
702 sealed: false,
703 num_posts: 0,
704 encoded_post_ids: vec![],
705 encoded_post_lengths: vec![],
706 encoded_post_healed: HashSet::new(),
707 peer: self.peer_self.read().clone(),
708 signature: Signature::zero(),
709 };
710
711 EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() }
712 }
713 };
714
715 if post_bundle.header.encoded_post_ids.contains(&decoded_post.post_id) {
717 anyhow::bail!("Post {} is already in the bundle", decoded_post.post_id);
718 }
719
720 post_bundle.header.time_millis = time_millis;
722 post_bundle.header.num_posts += 1;
723 post_bundle.header.overflowed = post_bundle.header.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
724 post_bundle.header.sealed = post_bundle.header.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
725 post_bundle.header.encoded_post_ids.push(decoded_post.post_id);
726 post_bundle.header.encoded_post_lengths.push(request.encoded_post_bytes.len());
727 post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
728 let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
729 posts_mut.extend_from_slice(request.encoded_post_bytes.as_ref());
730 post_bundle.encoded_posts_bytes = posts_mut.freeze();
731 let post_bundle_bytes_new = post_bundle.to_bytes()?;
732
733 post_bundle_metadata.num_posts = post_bundle.header.num_posts;
735 post_bundle_metadata.overflowed = post_bundle.header.overflowed;
736 post_bundle_metadata.sealed = post_bundle.header.sealed;
737 post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
738
739 {
740 self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &post_bundle_bytes_new)?;
741 self.environment
742 .put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, request.encoded_post_bytes.len())?;
743 }
744
745 info!("Persisted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
746
747 let submit_post_commit_token = SubmitPostCommitTokenV1::new(peer_self.clone(), request.bucket_location, decoded_post.post_id, &self.server_id.keys.signature_key);
748
749 let response = SubmitPostCommitResponseV1 { submit_post_commit_token };
750 Ok((PayloadResponseKind::SubmitPostCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
751 }
752
753 #[allow(non_snake_case)]
754 async fn dispatch_network_payload_x_SubmitPostFeedbackV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
755 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostFeedbackV1, &payload_request_kind);
756
757 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
758
759 let request = SubmitPostFeedbackV1::from_bytes(&mut bytes)?;
760 trace!("received SubmitPostFeedbackV1");
761
762 let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for SubmitPostFeedbackV1"))?;
764 if pow.pow < config::POW_MINIMUM_PER_FEEDBACK {
765 anyhow::bail!("Insufficient pow for feedback: {} < {}", pow.pow, config::POW_MINIMUM_PER_FEEDBACK);
766 }
767
768 request.encoded_post_feedback.pow_verify()?;
770
771 let location_id = request.bucket_location.location_id;
772
773 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&location_id, config::REDUNDANT_SERVERS_PER_POST);
775
776 let accepted = (|| -> anyhow::Result<bool> {
777 if !among_peers_nearer {
778 return Ok(false);
779 }
780
781 let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&location_id);
783 let Some(post_bundle_bytes) = self.environment.get_post_bundle_bytes(time_millis, &location_id)?
784 else {
785 return Ok(false);
786 };
787
788 let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
789 if !post_bundle.header.encoded_post_ids.contains(&request.encoded_post_feedback.post_id) {
790 return Ok(false);
791 }
792
793 Ok(true)
794 })()?;
795
796 if accepted {
798 trace!("Accepted post feedback for location_id={} encoded_post_feedback={:?}", location_id, request.encoded_post_feedback);
799 self.environment.put_post_feedback_if_more_powerful(time_millis, &location_id, &request.encoded_post_feedback)?;
800 }
801
802 let response = SubmitPostFeedbackResponseV1 { peers_nearer, accepted };
803 Ok((PayloadResponseKind::SubmitPostFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
804 }
805
806 #[allow(non_snake_case)]
807 async fn dispatch_network_payload_x_HealPostBundleClaimV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
808 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleClaimV1, &payload_request_kind);
809
810 fn generate_negatory_response() -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
811 let response = HealPostBundleClaimResponseV1 { needed_post_ids: vec![], token: None };
812 Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
813 }
814
815 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
816 let request = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
817 trace!("received HealPostBundleClaimV1");
818
819 request.bucket_location.validate()?;
821 if request.bucket_location.location_id != request.donor_header.location_id {
822 anyhow::bail!("HealPostBundleClaimV1: bucket_location.location_id does not match donor_header.location_id");
823 }
824
825 request.donor_header.verify()?;
827
828 let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.donor_header.location_id, config::REDUNDANT_SERVERS_PER_POST);
830 if !among_peers_nearer {
831 return generate_negatory_response();
832 }
833
834 if self.heal_in_progress.contains_key(&request.donor_header.location_id) {
836 return generate_negatory_response();
837 }
838
839 let _lock = self.environment.get_read_lock_for_location_id(&request.donor_header.location_id);
841 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.donor_header.location_id)?;
842
843 let our_post_ids: HashSet<Id> = match post_bundle_bytes {
844 Some(bytes) => {
845 let bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(bytes), false)?;
846 bundle.header.encoded_post_ids.into_iter().collect()
847 }
848 None => HashSet::new(),
849 };
850
851 let needed_post_ids: Vec<Id> = request.donor_header.encoded_post_ids.iter().filter(|id| !our_post_ids.contains(*id)).copied().collect();
853
854 if needed_post_ids.is_empty() {
855 return generate_negatory_response();
856 }
857
858 self.heal_in_progress.insert(request.donor_header.location_id, ());
859
860 let token = Some(HealPostBundleClaimTokenV1::new(
861 self.peer_self.read().clone(),
862 request.bucket_location,
863 needed_post_ids.clone(),
864 request.donor_header.signature,
865 &self.server_id.keys.signature_key,
866 ));
867 let response = HealPostBundleClaimResponseV1 { needed_post_ids, token };
868 Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
869 }
870
871 #[allow(non_snake_case)]
872 async fn dispatch_network_payload_x_HealPostBundleCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
873 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleCommitV1, &payload_request_kind);
874
875 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
876 let request = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
877 trace!("received HealPostBundleCommitV1");
878
879 let peer_self = self.peer_self.read().clone();
881 if request.token.peer.id != peer_self.id {
882 anyhow::bail!("HealPostBundleCommitV1: token was not issued by this server");
883 }
884 request.token.verify()?;
885
886 if request.donor_header.signature != request.token.donor_header_signature {
888 anyhow::bail!("HealPostBundleCommitV1: donor_header signature does not match token");
889 }
890 request.donor_header.verify()?;
891
892 if request.token.bucket_location.location_id != request.donor_header.location_id {
893 anyhow::bail!("HealPostBundleCommitV1: token location_id does not match donor_header");
894 }
895
896 let location_id = request.donor_header.location_id;
897
898 let mut remaining_bytes = request.encoded_posts_bytes.clone();
900 let mut posts_to_add: Vec<(Id, Bytes)> = Vec::new();
901 for post_id in &request.token.needed_post_ids {
902 let len = request
903 .donor_header
904 .encoded_post_ids
905 .iter()
906 .zip(request.donor_header.encoded_post_lengths.iter())
907 .find(|(id, _)| *id == post_id)
908 .map(|(_, len)| *len)
909 .ok_or_else(|| anyhow::anyhow!("needed_post_id {} not found in donor_header", post_id))?;
910 if remaining_bytes.len() < len {
911 anyhow::bail!("HealPostBundleCommitV1: not enough bytes for post {}", post_id);
912 }
913 let post_bytes = remaining_bytes.split_to(len);
914 posts_to_add.push((*post_id, post_bytes));
915 }
916 if !remaining_bytes.is_empty() {
917 anyhow::bail!("HealPostBundleCommitV1: {} excess bytes", remaining_bytes.len());
918 }
919
920 for (post_id, post_bytes) in &posts_to_add {
922 EncodedPostV1::decode_from_bytes(post_bytes.clone(), &request.token.bucket_location.base_id, true, true).map_err(|e| anyhow::anyhow!("HealPostBundleCommitV1: post {} failed decryption: {}", post_id, e))?;
923 }
924
925 let _lock = self.environment.get_write_lock_for_location_id(&location_id);
927 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &location_id)?;
928 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &location_id)?;
929 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
930
931 let mut post_bundle = match post_bundle_bytes {
932 Some(b) => EncodedPostBundleV1::from_bytes(Bytes::from_owner(b), true)?,
933 None => EncodedPostBundleV1 {
934 header: EncodedPostBundleHeaderV1 {
935 time_millis: TimeMillis::zero(),
936 location_id,
937 overflowed: request.donor_header.overflowed,
938 sealed: request.donor_header.sealed,
939 num_posts: 0,
940 encoded_post_ids: vec![],
941 encoded_post_lengths: vec![],
942 encoded_post_healed: HashSet::new(),
943 peer: peer_self.clone(),
944 signature: Signature::zero(),
945 },
946 encoded_posts_bytes: Bytes::new(),
947 },
948 };
949
950 let our_post_ids: HashSet<Id> = post_bundle.header.encoded_post_ids.iter().copied().collect();
951
952 let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
953 let mut added_any = false;
954 for (post_id, post_bytes) in posts_to_add {
955 if !our_post_ids.contains(&post_id) {
956 let len = post_bytes.len();
957 posts_mut.extend_from_slice(&post_bytes);
958 post_bundle.header.encoded_post_ids.push(post_id);
959 post_bundle.header.encoded_post_lengths.push(len);
960 post_bundle.header.encoded_post_healed.insert(post_id);
961 added_any = true;
962 }
963 }
964 post_bundle.encoded_posts_bytes = posts_mut.freeze();
965
966 if added_any {
967 post_bundle.header.time_millis = time_millis;
968 post_bundle.header.num_posts = post_bundle.header.encoded_post_ids.len() as u8;
969 post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
970
971 let new_bytes = post_bundle.to_bytes()?;
972 post_bundle_metadata.num_posts = post_bundle.header.num_posts;
973 post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
974
975 self.environment.put_post_bundle_bytes(time_millis, &location_id, &new_bytes)?;
976 self.environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, 0)?;
977
978 info!("Healed {} post(s) for location_id={}", post_bundle.header.encoded_post_healed.len(), location_id);
979 }
980
981 self.heal_in_progress.invalidate(&location_id);
982
983 let response = HealPostBundleCommitResponseV1 { accepted: added_any };
984 Ok((PayloadResponseKind::HealPostBundleCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
985 }
986
987 #[allow(non_snake_case)]
988 async fn dispatch_network_payload_x_HealPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
989 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleFeedbackV1, &payload_request_kind);
990
991 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
992 let request = HealPostBundleFeedbackV1::from_bytes(&mut bytes)?;
993 trace!("received HealPostBundleFeedbackV1 for location_id={}", request.location_id);
994
995 let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.location_id, config::REDUNDANT_SERVERS_PER_POST);
997 if !among_peers_nearer {
998 let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
999 return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1000 }
1001
1002 let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&request.location_id);
1004 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.location_id)?;
1005 let Some(post_bundle_bytes) = post_bundle_bytes
1006 else {
1007 let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1008 return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1009 };
1010 let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
1011
1012 let mut accepted_count: u32 = 0;
1013 for feedback in &request.encoded_post_feedbacks {
1014 if !post_bundle.header.encoded_post_ids.contains(&feedback.post_id) {
1016 continue;
1017 }
1018 self.environment.put_post_feedback_if_more_powerful(time_millis, &request.location_id, feedback)?;
1019 accepted_count += 1;
1020 }
1021
1022 if accepted_count > 0 {
1023 trace!("Accepted {} healed feedback(s) for location_id={}", accepted_count, request.location_id);
1024 }
1025
1026 let response = HealPostBundleFeedbackResponseV1 { accepted_count };
1027 Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1028 }
1029
1030 #[allow(non_snake_case)]
1031 async fn dispatch_network_payload_x_CachePostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1032 anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleV1, &payload_request_kind);
1033
1034 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1035 let request = CachePostBundleV1::from_bytes(&mut bytes)?;
1036 trace!("received CachePostBundleV1 for bucket_location={}", request.token.bucket_location);
1037
1038 let peer_self = self.peer_self.read().clone();
1040 if request.token.peer.id != peer_self.id {
1041 anyhow::bail!("CachePostBundleV1: token was not issued by this server");
1042 }
1043 request.token.verify()?;
1044 if request.token.is_expired(time_millis) {
1045 anyhow::bail!("CachePostBundleV1: token has expired");
1046 }
1047
1048 let mut any_accepted = false;
1049 for bundle_bytes in request.encoded_post_bundles {
1050 let parse_result: anyhow::Result<()> = try {
1051 let encoded_post_bundle = EncodedPostBundleV1::from_bytes(bundle_bytes.clone(), true)?;
1052
1053 encoded_post_bundle.verify(&request.token.bucket_location.base_id)?;
1055
1056 let originator_peer_id = encoded_post_bundle.header.peer.id;
1057 let is_sealed = encoded_post_bundle.header.sealed;
1058 if self.post_bundle_cache.on_upload(request.token.bucket_location.location_id, originator_peer_id, bundle_bytes, time_millis, is_sealed) {
1059 any_accepted = true;
1060 }
1061 };
1062 if let Err(e) = &parse_result {
1063 warn!("CachePostBundleV1: failed to parse bundle: {}", e);
1064 }
1065 }
1066 let response = CachePostBundleResponseV1 { accepted: any_accepted };
1067 Ok((PayloadResponseKind::CachePostBundleResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1068 }
1069
1070 #[allow(non_snake_case)]
1071 async fn dispatch_network_payload_x_CachePostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1072 anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleFeedbackV1, &payload_request_kind);
1073
1074 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1075 let request = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
1076 trace!("received CachePostBundleFeedbackV1 for bucket_location={}", request.token.bucket_location);
1077
1078 let peer_self = self.peer_self.read().clone();
1080 if request.token.peer.id != peer_self.id {
1081 anyhow::bail!("CachePostBundleFeedbackV1: token was not issued by this server");
1082 }
1083 request.token.verify()?;
1084 if request.token.is_expired(time_millis) {
1085 anyhow::bail!("CachePostBundleFeedbackV1: token has expired");
1086 }
1087
1088 let result: anyhow::Result<bool> = try {
1089 let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(request.encoded_post_bundle_feedback_bytes.clone())?;
1090
1091 encoded_post_bundle_feedback.verify()?;
1092
1093 let originator_peer_id = encoded_post_bundle_feedback.header.peer.id;
1094 self.post_bundle_feedback_cache
1096 .on_upload(request.token.bucket_location.location_id, originator_peer_id, request.encoded_post_bundle_feedback_bytes, time_millis, false)
1097 };
1098 let accepted = result.unwrap_or_else(|e| {
1099 warn!("CachePostBundleFeedbackV1: parse error: {}", e);
1100 false
1101 });
1102
1103 let response = CachePostBundleFeedbackResponseV1 { accepted };
1104 Ok((PayloadResponseKind::CachePostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1105 }
1106
1107 #[allow(non_snake_case)]
1108 async fn dispatch_network_payload_x_FetchUrlPreviewV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1109 anyhow_assert_eq!(&PayloadRequestKind::FetchUrlPreviewV1, &payload_request_kind);
1110
1111 let request = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
1112 trace!("received FetchUrlPreviewV1 for url={}", request.url);
1113
1114 if !request.url.starts_with("https://") {
1117 anyhow::bail!("FetchUrlPreviewV1 SSRF: only https:// URLs are allowed");
1118 }
1119
1120 let host_and_port = request.url["https://".len()..].split(&['/', '?', '#'][..]).next().unwrap_or("");
1122 let host = if host_and_port.starts_with('[') {
1123 host_and_port.trim_start_matches('[').split(']').next().unwrap_or("")
1125 } else {
1126 host_and_port.split(':').next().unwrap_or(host_and_port)
1128 };
1129 if host.is_empty() {
1130 anyhow::bail!("FetchUrlPreviewV1 SSRF: could not extract host from URL");
1131 }
1132 if host.parse::<std::net::IpAddr>().is_ok() {
1133 anyhow::bail!("FetchUrlPreviewV1 SSRF: bare IP addresses are not allowed");
1134 }
1135
1136 let resolved_socket_addrs: Vec<std::net::SocketAddr> = tokio::net::lookup_host((host, 443u16))
1140 .await
1141 .map_err(|e| anyhow::anyhow!("FetchUrlPreviewV1 SSRF: DNS resolution failed for {}: {}", host, e))?
1142 .collect();
1143 if resolved_socket_addrs.is_empty() {
1144 anyhow::bail!("FetchUrlPreviewV1 SSRF: DNS returned no addresses for {}", host);
1145 }
1146 for socket_addr in &resolved_socket_addrs {
1147 let ip = socket_addr.ip();
1148 if is_ssrf_protected_ip(ip) {
1149 anyhow::bail!("FetchUrlPreviewV1 SSRF: {} resolved to protected address {}", host, ip);
1150 }
1151 }
1152
1153 let http_client = reqwest::Client::builder()
1160 .connect_timeout(std::time::Duration::from_secs(1))
1161 .timeout(std::time::Duration::from_secs(3))
1162 .user_agent("hashiverse-preview/1.0")
1163 .resolve_to_addrs(host, &resolved_socket_addrs)
1164 .redirect(reqwest::redirect::Policy::none())
1165 .no_proxy()
1166 .build()?;
1167
1168 const URL_FETCH_MAX_BODY_BYTES: usize = 512 * 1024;
1169 let mut http_response = http_client.get(&request.url).send().await?;
1170
1171 if let Some(content_length) = http_response.content_length() {
1173 if content_length > URL_FETCH_MAX_BODY_BYTES as u64 {
1174 anyhow::bail!("FetchUrlPreviewV1: Content-Length {} exceeds {} byte limit", content_length, URL_FETCH_MAX_BODY_BYTES);
1175 }
1176 }
1177 let mut body_bytes = BytesMut::new();
1178 while let Some(chunk) = http_response.chunk().await? {
1179 let remaining = URL_FETCH_MAX_BODY_BYTES - body_bytes.len();
1184 body_bytes.extend_from_slice(&chunk[..chunk.len().min(remaining)]);
1185 if body_bytes.len() >= URL_FETCH_MAX_BODY_BYTES {
1186 break;
1187 }
1188 }
1189 let html = String::from_utf8_lossy(&body_bytes).into_owned();
1190
1191 let preview_data = url_preview::extract_url_preview(&html);
1192
1193 let response = FetchUrlPreviewResponseV1 {
1194 url: if preview_data.canonical_url.is_empty() { request.url } else { preview_data.canonical_url },
1195 title: preview_data.title,
1196 description: preview_data.description,
1197 image_url: preview_data.image_url,
1198 };
1199
1200 Ok((PayloadResponseKind::FetchUrlPreviewResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1201 }
1202
1203 #[allow(non_snake_case)]
1204 async fn dispatch_network_payload_x_TrendingHashtagsFetchV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1205 anyhow_assert_eq!(&PayloadRequestKind::TrendingHashtagsFetchV1, &payload_request_kind);
1206
1207 let request = TrendingHashtagsFetchV1::from_bytes(&mut bytes)?;
1208 trace!("received TrendingHashtagsFetchV1 with limit={}", request.limit);
1209
1210 let time_millis = self.runtime_services.time_provider.current_time_millis();
1211
1212 let cached_response = {
1214 let cache = self.trending_hashtags_response_cache.lock();
1215 match cache.as_ref() {
1216 Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(30) => {
1217 Some(cached_response.clone())
1218 }
1219 _ => None,
1220 }
1221 };
1222
1223 let mut response = match cached_response {
1224 Some(mut cached) => {
1225 cached.trending_hashtags.truncate(request.limit as usize);
1226 cached
1227 }
1228 None => {
1229 let mut trending_hashtags: Vec<TrendingHashtagV1> = self.trending_hashtags.iter()
1231 .map(|(hashtag, hll)| TrendingHashtagV1 {
1232 hashtag: hashtag.as_ref().clone(),
1233 count: hll.count(),
1234 })
1235 .filter(|entry| entry.count > 0)
1236 .collect();
1237
1238 trending_hashtags.sort_by(|a, b| b.count.cmp(&a.count));
1239
1240 let full_response = TrendingHashtagsFetchResponseV1 { trending_hashtags };
1241
1242 {
1244 let mut cache = self.trending_hashtags_response_cache.lock();
1245 *cache = Some((time_millis, full_response.clone()));
1246 }
1247
1248 let mut truncated_response = full_response;
1249 truncated_response.trending_hashtags.truncate(request.limit as usize);
1250 truncated_response
1251 }
1252 };
1253
1254 top_up_trending_hashtags_with_fallback(&mut response.trending_hashtags, request.limit, TRENDING_HASHTAGS_FALLBACK);
1255
1256 Ok((PayloadResponseKind::TrendingHashtagsFetchResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1257 }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262 use super::*;
1263
1264 fn make_trending_hashtag(hashtag: &str, count: u64) -> TrendingHashtagV1 {
1265 TrendingHashtagV1 { hashtag: hashtag.to_string(), count }
1266 }
1267
1268 #[test]
1269 fn top_up_adds_fallback_when_empty() {
1270 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1271 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &["#hashiverse", "#news"]);
1272 assert_eq!(trending_hashtags.len(), 2);
1273 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1274 assert_eq!(trending_hashtags[0].count, 0);
1275 assert_eq!(trending_hashtags[1].hashtag, "#news");
1276 assert_eq!(trending_hashtags[1].count, 0);
1277 }
1278
1279 #[test]
1280 fn top_up_respects_limit_smaller_than_fallback_list() {
1281 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1282 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 1, &["#hashiverse", "#news"]);
1283 assert_eq!(trending_hashtags.len(), 1);
1284 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1285 }
1286
1287 #[test]
1288 fn top_up_preserves_fallback_order() {
1289 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1290 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#first", "#second", "#third"]);
1291 assert_eq!(trending_hashtags[0].hashtag, "#first");
1292 assert_eq!(trending_hashtags[1].hashtag, "#second");
1293 assert_eq!(trending_hashtags[2].hashtag, "#third");
1294 }
1295
1296 #[test]
1297 fn top_up_is_noop_when_already_at_limit() {
1298 let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10), make_trending_hashtag("#golang", 5)];
1299 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 2, &["#hashiverse", "#news"]);
1300 assert_eq!(trending_hashtags.len(), 2);
1301 assert_eq!(trending_hashtags[0].hashtag, "#rust");
1302 assert_eq!(trending_hashtags[1].hashtag, "#golang");
1303 }
1304
1305 #[test]
1306 fn top_up_partially_fills_when_real_trending_exists() {
1307 let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10)];
1308 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1309 assert_eq!(trending_hashtags.len(), 3);
1310 assert_eq!(trending_hashtags[0].hashtag, "#rust");
1311 assert_eq!(trending_hashtags[0].count, 10);
1312 assert_eq!(trending_hashtags[1].hashtag, "#hashiverse");
1313 assert_eq!(trending_hashtags[1].count, 0);
1314 assert_eq!(trending_hashtags[2].hashtag, "#news");
1315 assert_eq!(trending_hashtags[2].count, 0);
1316 }
1317
1318 #[test]
1319 fn top_up_skips_fallback_already_present_exact_match() {
1320 let mut trending_hashtags = vec![make_trending_hashtag("#hashiverse", 42)];
1321 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1322 assert_eq!(trending_hashtags.len(), 2);
1323 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1324 assert_eq!(trending_hashtags[0].count, 42, "real trending entry must not be overwritten by filler");
1325 assert_eq!(trending_hashtags[1].hashtag, "#news");
1326 assert_eq!(trending_hashtags[1].count, 0);
1327 }
1328
1329 #[test]
1330 fn top_up_dedup_is_case_insensitive_and_prefix_agnostic() {
1331 let mut trending_hashtags = vec![make_trending_hashtag("HashiVerse", 7)];
1333 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1334 assert_eq!(trending_hashtags.len(), 2);
1335 assert_eq!(trending_hashtags[0].hashtag, "HashiVerse");
1336 assert_eq!(trending_hashtags[1].hashtag, "#news");
1337 }
1338
1339 #[test]
1340 fn top_up_with_empty_fallback_is_noop() {
1341 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1342 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &[]);
1343 assert_eq!(trending_hashtags.len(), 0);
1344 }
1345
1346 #[test]
1347 fn top_up_handles_zero_limit() {
1348 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1349 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 0, &["#hashiverse", "#news"]);
1350 assert_eq!(trending_hashtags.len(), 0);
1351 }
1352
1353 #[test]
1354 fn top_up_exhausts_fallback_without_reaching_limit() {
1355 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1357 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 10, &["#hashiverse", "#news"]);
1358 assert_eq!(trending_hashtags.len(), 2, "should stop at the end of the fallback list, not pad further");
1359 }
1360}
1361