Skip to main content

hashiverse_server_lib/server/handlers/
dispatch.rs

1//! # Inbound RPC dispatch loop
2//!
3//! The hot path of the server: a single async loop that drains `IncomingRequest`s
4//! from the transport's `mpsc::Receiver`, decodes each packet via
5//! [`hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx`], and routes
6//! by [`hashiverse_lib::protocol::payload::payload::PayloadRequestKind`] to the
7//! correct per-op handler (bootstrap, announce, get/submit/heal/cache post
8//! bundles, get/submit/heal/cache feedback, fetch URL preview, trending
9//! hashtags, ping).
10//!
11//! Per-request safety checks happen before any real work:
12//!
13//! - **PoW verification** — the packet's PoW must be sufficient for *this* server's
14//!   identity. Anything under-powered or stale is dropped immediately.
15//! - **Replay protection** — a short-lived salt cache rejects salts we've already
16//!   seen, so a valid signed request can't be replayed from another network vantage.
17//! - **Peer upgrade** — if the caller's embedded [`hashiverse_lib::protocol::peer::Peer`] carries a stronger PoW
18//!   than what we have in the tracker, the tracker is upgraded in place.
19//!
20//! The loop respects a `CancellationToken` so graceful shutdown drains in-flight
21//! work and stops accepting new requests cleanly.
22
23use crate::environment::environment::PostBundleMetadata;
24use crate::server::hashiverse_server::HashiverseServer;
25use crate::tools::tools::is_ssrf_protected_ip;
26use bytes::{Bytes, BytesMut};
27use hashiverse_lib::anyhow_assert_eq;
28use hashiverse_lib::protocol::payload::payload::{
29    AnnounceResponseV1, AnnounceV1, BootstrapResponseV1, CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1, CachePostBundleV1, ErrorResponseV1, FetchUrlPreviewResponseV1, FetchUrlPreviewV1,
30    GetPostBundleFeedbackResponseV1, GetPostBundleFeedbackV1, GetPostBundleResponseV1, GetPostBundleV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1,
31    HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind, PingResponseV1, SubmitPostClaimResponseV1, SubmitPostClaimTokenV1, SubmitPostClaimV1, SubmitPostCommitResponseV1, SubmitPostCommitTokenV1,
32    SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1, TrendingHashtagV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1,
33};
34use hashiverse_lib::protocol::peer::PeerPow;
35use hashiverse_lib::protocol::posting::amplification::get_minimum_post_pow;
36use hashiverse_lib::protocol::posting::encoded_post::EncodedPostV1;
37use hashiverse_lib::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
38use hashiverse_lib::protocol::posting::encoded_post_bundle_feedback::{EncodedPostBundleFeedbackHeaderV1, EncodedPostBundleFeedbackV1};
39use hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx;
40use hashiverse_lib::protocol::rpc::rpc_response::{RpcResponsePacketTx, RpcResponsePacketTxFlags};
41use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
42use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_SECOND};
43use hashiverse_lib::tools::hyper_log_log::HyperLogLog;
44use hashiverse_lib::tools::types::{Id, Signature};
45use hashiverse_lib::tools::{hashing, url_preview};
46use hashiverse_lib::tools::{config, json, BytesGatherer};
47use hashiverse_lib::transport::transport::IncomingRequest;
48use log::{info, trace, warn};
49use std::collections::HashSet;
50use tokio::sync::mpsc;
51use tokio_util::sync::CancellationToken;
52
53/// Fallback hashtags used to top up a trending-hashtags response when the server
54/// does not yet know enough real trending hashtags to satisfy the requested limit.
55/// Applied in order, skipping any entry whose normalised form is already present
56/// in the real trending list. Filler entries are returned with `count = 0` so
57/// clients can distinguish seeded fillers from genuine trending data.
58const TRENDING_HASHTAGS_FALLBACK: &[&str] = &["hashiverse", "news"];
59
60/// Normalise a hashtag for equality comparison: lowercase, with any leading `#`
61/// stripped. Mirrors the canonicalisation performed by `Id::from_hashtag_str`.
62fn normalise_hashtag(hashtag: &str) -> String {
63    let lowercased = hashtag.to_lowercase();
64    match lowercased.strip_prefix('#') {
65        Some(stripped) => stripped.to_string(),
66        None => lowercased,
67    }
68}
69
70/// Top up `trending_hashtags` from `fallback_hashtags` (in order) until it reaches
71/// `limit`, skipping any fallback whose normalised form already appears in the list.
72/// Filler entries are inserted with `count = 0`. No-op if the list is already at
73/// or above the limit.
74fn top_up_trending_hashtags_with_fallback(trending_hashtags: &mut Vec<TrendingHashtagV1>, limit: u16, fallback_hashtags: &[&str]) {
75    let target_length = limit as usize;
76    if trending_hashtags.len() >= target_length {
77        return;
78    }
79
80    let mut existing_normalised_hashtags: HashSet<String> = trending_hashtags.iter()
81        .map(|entry| normalise_hashtag(&entry.hashtag))
82        .collect();
83
84    for fallback_hashtag in fallback_hashtags {
85        if trending_hashtags.len() >= target_length {
86            break;
87        }
88        let normalised_fallback_hashtag = normalise_hashtag(fallback_hashtag);
89        if existing_normalised_hashtags.contains(&normalised_fallback_hashtag) {
90            continue;
91        }
92        trending_hashtags.push(TrendingHashtagV1 {
93            hashtag: (*fallback_hashtag).to_string(),
94            count: 0,
95        });
96        existing_normalised_hashtags.insert(normalised_fallback_hashtag);
97    }
98}
99
100impl HashiverseServer {
101    pub async fn wrap_and_dispatch_network_envelopes(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> Result<(), anyhow::Error> {
102        loop {
103            tokio::select! {
104                _ = cancellation_token.cancelled() => { break },
105
106                receipt = rx.recv() => {
107                    match receipt {
108                        Some(incoming) => {
109                            // trace!("dispatch_network_envelopes received bytes={:?}", incoming.bytes);
110                            let result = self.wrap_and_dispatch_network_envelope(cancellation_token.clone(), &incoming).await;
111                            match result {
112                                Ok(bytes) => {
113                                    let result = incoming.reply.send(bytes);
114                                    if result.is_err() { warn!("failed to send reply"); }
115                                },
116                                Err(e) => {
117                                    warn!("failed to process packet from {}: {}", incoming.caller_address, e);
118                                    incoming.report_bad_request();
119                                    drop(incoming.reply);
120                                },
121                            }
122                        },
123                        None => {
124                            warn!("channel closed");
125                            break;
126                        }
127                    }
128                }
129            }
130        }
131
132        Ok(())
133    }
134
135    async fn wrap_and_dispatch_network_envelope(&self, cancellation_token: CancellationToken, incoming: &IncomingRequest) -> anyhow::Result<BytesGatherer> {
136        let caller_address = incoming.caller_address.as_str();
137        let current_time_millis = self.runtime_services.time_provider.current_time_millis();
138
139        // Decode the envelope
140        let rpc_request_packet_rx = RpcRequestPacketRx::decode(&current_time_millis, &self.server_id.keys.verification_key_bytes, &self.server_id.keys.pq_commitment_bytes, incoming.bytes.clone())?;
141        // trace!("payload_request_kind={}", rpc_request_packet_rx.payload_request_kind);
142
143        // Check that we have not seen this salt recently (stops replay attacks)
144        {
145            if self.seen_salts.contains_key(&rpc_request_packet_rx.pow_salt) {
146                anyhow::bail!("replay detected: salt already seen");
147            }
148            self.seen_salts.insert(rpc_request_packet_rx.pow_salt, ());
149        }
150
151        // Keep this for our response
152        let pow_content_hash = rpc_request_packet_rx.pow_content_hash;
153
154        let dispatch_result: anyhow::Result<BytesGatherer> = try {
155            // Check that the pow is meaningful
156            let pow = match rpc_request_packet_rx.pow_server_known {
157                true => {
158                    let (pow, improved_pow_current_day, improved_pow_current_month) = {
159                        let peer_self = self.peer_self.read(); // Remember alphabetical locking order!
160                        let pow = PeerPow::new(
161                            rpc_request_packet_rx.pow_sponsor_id,
162                            &peer_self.verification_key_bytes,
163                            &peer_self.pq_commitment_bytes,
164                            rpc_request_packet_rx.pow_timestamp,
165                            rpc_request_packet_rx.pow_content_hash,
166                            rpc_request_packet_rx.pow_salt,
167                        )?;
168
169                        let improved_pow_current_day = pow.pow_decayed_day(current_time_millis) > peer_self.pow_current_day.pow_decayed_day(current_time_millis);
170                        let improved_pow_current_month = pow.pow_decayed_month(current_time_millis) > peer_self.pow_current_month.pow_decayed_month(current_time_millis);
171
172                        (pow, improved_pow_current_day, improved_pow_current_month)
173                    };
174
175                    // Check if we need to modify peer_self
176                    if improved_pow_current_day || improved_pow_current_month {
177                        let mut peer_self = self.peer_self.write(); // Remember alphabetical locking order!
178                        if improved_pow_current_day {
179                            trace!("pow_current_day upgraded {} -> {}", peer_self.pow_current_day, pow);
180                            peer_self.pow_current_day = pow.clone();
181                        }
182                        if improved_pow_current_month {
183                            trace!("pow_current_month upgraded {} -> {}", peer_self.pow_current_month, pow);
184                            peer_self.pow_current_month = pow.clone();
185                        }
186
187                        peer_self.sign(self.runtime_services.time_provider.as_ref(), &self.server_id.keys.signature_key)?;
188                    }
189
190                    Some(pow)
191                }
192
193                false => {
194                    // Only some request types are allowed anonymous pow
195                    match rpc_request_packet_rx.payload_request_kind {
196                        PayloadRequestKind::BootstrapV1 => {}
197                        _ => anyhow::bail!("Anonymous pow not allowed for {}", rpc_request_packet_rx.payload_request_kind),
198                    }
199
200                    None
201                }
202            };
203
204            // Dispatch appropriately
205            let (compress_response, payload_response_kind, payload) = self.dispatch_network_envelope(cancellation_token, pow, rpc_request_packet_rx).await?;
206            let response_flags = match compress_response {
207                true => RpcResponsePacketTxFlags::COMPRESSED,
208                false => RpcResponsePacketTxFlags::empty(),
209            };
210
211            // Encode response
212            RpcResponsePacketTx::encode(
213                &self.server_id.keys.signature_key,
214                &self.server_id.keys.verification_key_bytes,
215                &self.server_id.keys.pq_commitment_bytes,
216                &self.server_id.sponsor_id,
217                &self.server_id.timestamp,
218                &self.server_id.hash,
219                &self.server_id.salt,
220                &pow_content_hash,
221                response_flags,
222                payload_response_kind,
223                payload,
224            )?
225        };
226
227        match dispatch_result {
228            Ok(results) => Ok(results),
229            Err(e) => {
230                warn!("failed to dispatch packet from {}: {}", caller_address, e);
231                incoming.report_bad_request();
232
233                let payload_response_kind = PayloadResponseKind::ErrorResponseV1;
234                let response = ErrorResponseV1 { code: 0, message: e.to_string() };
235                let payload = BytesGatherer::from_bytes(json::struct_to_bytes(&response)?);
236
237                // Encode response
238                RpcResponsePacketTx::encode(
239                    &self.server_id.keys.signature_key,
240                    &self.server_id.keys.verification_key_bytes,
241                    &self.server_id.keys.pq_commitment_bytes,
242                    &self.server_id.sponsor_id,
243                    &self.server_id.timestamp,
244                    &self.server_id.hash,
245                    &self.server_id.salt,
246                    &pow_content_hash,
247                    RpcResponsePacketTxFlags::COMPRESSED,
248                    payload_response_kind,
249                    payload,
250                )
251            }
252        }
253    }
254
255    async fn dispatch_network_envelope(&self, cancellation_token: CancellationToken, pow: Option<PeerPow>, rpc_request_packet_rx: RpcRequestPacketRx) -> anyhow::Result<(bool, PayloadResponseKind, BytesGatherer)> {
256        // Where do we want to decide if we should compress?  Here in one block, or at the end of each individual dispatch_xxx?
257        let compress_response = match rpc_request_packet_rx.payload_request_kind {
258            PayloadRequestKind::GetPostBundleV1 => false,   // We don't compress these again as they are already predominantly compressed
259            PayloadRequestKind::CachePostBundleV1 => false, // We don't compress these again as they are already predominantly compressed
260            _ => true,
261        };
262
263        let (payload_response_kind, payload) = match rpc_request_packet_rx.payload_request_kind {
264            PayloadRequestKind::ErrorV1 => {
265                anyhow::bail!("Received ErrorV1");
266            }
267            PayloadRequestKind::PingV1 => self.dispatch_network_payload_x_PingV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
268            PayloadRequestKind::BootstrapV1 => self.dispatch_network_payload_x_BootstrapV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
269            PayloadRequestKind::AnnounceV1 => self.dispatch_network_payload_x_AnnounceV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
270            PayloadRequestKind::GetPostBundleV1 => self.dispatch_network_payload_x_GetPostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
271            PayloadRequestKind::GetPostBundleFeedbackV1 => { self.dispatch_network_payload_x_GetPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
272            PayloadRequestKind::SubmitPostClaimV1 => { self.dispatch_network_payload_x_SubmitPostClaimV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
273            PayloadRequestKind::SubmitPostCommitV1 => { self.dispatch_network_payload_x_SubmitPostCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
274            PayloadRequestKind::SubmitPostFeedbackV1 => { self.dispatch_network_payload_x_SubmitPostFeedbackV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
275            PayloadRequestKind::HealPostBundleClaimV1 => { self.dispatch_network_payload_x_HealPostBundleClaimV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
276            PayloadRequestKind::HealPostBundleCommitV1 => { self.dispatch_network_payload_x_HealPostBundleCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
277            PayloadRequestKind::HealPostBundleFeedbackV1 => { self.dispatch_network_payload_x_HealPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
278            PayloadRequestKind::CachePostBundleV1 => self.dispatch_network_payload_x_CachePostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
279            PayloadRequestKind::CachePostBundleFeedbackV1 => { self.dispatch_network_payload_x_CachePostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
280            PayloadRequestKind::FetchUrlPreviewV1 => self.dispatch_network_payload_x_FetchUrlPreviewV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
281            PayloadRequestKind::TrendingHashtagsFetchV1 => self.dispatch_network_payload_x_TrendingHashtagsFetchV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
282        };
283
284        Ok((compress_response, payload_response_kind, payload))
285    }
286
287    #[allow(non_snake_case)]
288    async fn dispatch_network_payload_x_PingV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
289        anyhow_assert_eq!(&PayloadRequestKind::PingV1, &payload_request_kind);
290        let peer = self.peer_self.read().clone();
291        let json = json::struct_to_bytes(&PingResponseV1 { peer })?;
292        Ok((PayloadResponseKind::PingResponseV1, BytesGatherer::from_bytes(json)))
293    }
294
295    #[allow(non_snake_case)]
296    async fn dispatch_network_payload_x_BootstrapV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
297        anyhow_assert_eq!(&PayloadRequestKind::BootstrapV1, &payload_request_kind);
298        let peers_random = self.kademlia.read().get_peers_random(config::BOOTSTRAP_V1_NUM_PEERS);
299        let json = json::struct_to_bytes(&BootstrapResponseV1 { peers_random })?;
300        Ok((PayloadResponseKind::BootstrapResponseV1, BytesGatherer::from_bytes(json)))
301    }
302
303    #[allow(non_snake_case)]
304    async fn dispatch_network_payload_x_AnnounceV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
305        anyhow_assert_eq!(&PayloadRequestKind::AnnounceV1, &payload_request_kind);
306
307        let request = json::bytes_to_struct::<AnnounceV1>(&bytes)?;
308        // trace!("received AnnounceV1 from peer={}", request.peer_self);
309
310        let peer = request.peer_self;
311        let peer_id = peer.id;
312
313        // Check that this Peer checks out and add it to our kademlia
314        self.add_potential_peer_to_kademlia(peer, self.runtime_services.time_provider.as_ref().current_time_millis()).await;
315
316        let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
317
318        let json = json::struct_to_bytes(&AnnounceResponseV1 {
319            peer_self: self.peer_self.read().clone(),
320            peers_nearest,
321        })?;
322        Ok((PayloadResponseKind::AnnounceResponseV1, BytesGatherer::from_bytes(json)))
323    }
324
325    #[allow(non_snake_case)]
326    async fn dispatch_network_payload_x_GetPostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
327        anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleV1, &payload_request_kind);
328
329        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
330
331        let request = json::bytes_to_struct::<GetPostBundleV1>(&bytes)?;
332        trace!("received GetPostBundleV1: bucket_location={}", request.bucket_location);
333
334        // Check that the location_id makes sense
335        request.bucket_location.validate()?;
336
337        // Store the provided Peers
338        {
339            for peer in request.peers_visited {
340                self.add_potential_peer_to_kademlia(peer, time_millis).await;
341            }
342        }
343
344        let peer_self = self.peer_self.read().clone();
345
346        // Check our own records to see that we are close enough to store this post
347        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
348        if !among_peers_nearer {
349            warn!("I am not in peers_nearer {}", peer_self);
350        }
351
352        let post_bundle = match among_peers_nearer {
353            true => {
354                // At some point, for parallelisms sake, we may wish to replace this with a read lock followed by a release and write lock and recheck
355                // But then again - if our server is getting load, the federated cache mechanism should alleviate it, so this may be overkill...
356                let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
357
358                let mut encoded_post_bundle_bytes: Option<Bytes> = None;
359
360                // Try to load bytes from the disk (if we even have them)
361                let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
362                if let Some(mut post_bundle_metadata) = post_bundle_metadata {
363                    encoded_post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
364
365                    // Has the PostBundle become sealed since the last time it was written to disk?  Perhaps enough time has passed
366                    if !post_bundle_metadata.sealed {
367                        let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
368                        if sealed {
369                            // We can rewrite the postbundle on disk for the final time now that it is sealed
370                            if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
371                                let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
372                                encoded_post_bundle.header.time_millis = time_millis;
373                                encoded_post_bundle.header.sealed = true;
374                                encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
375                                let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
376                                self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &encoded_post_bundle_bytes_new)?;
377                                encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
378                            }
379
380                            // And the updated metadata
381                            post_bundle_metadata.sealed = true;
382                            self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
383                        }
384                        else {
385                            // We must simply update the timestamp - we need this for client caching.  It's expensive, but caching should help us out here if it is a truly busy bucket
386                            if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
387                                let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
388                                encoded_post_bundle.header.time_millis = time_millis;
389                                encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
390                                let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
391                                encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
392                            }
393                        }
394                    }
395                };
396
397                // If we dont have the metadata, or the bytes on disk, return a fresh one
398                // Generally if we have the metadata, we should always have bytes on disk, except if a request ot post was granted but they then never came back with the data...
399                if encoded_post_bundle_bytes.is_none() {
400                    let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
401
402                    let mut header = EncodedPostBundleHeaderV1 {
403                        time_millis,
404                        location_id: request.bucket_location.location_id,
405                        overflowed: false,
406                        sealed,
407                        num_posts: 0,
408                        encoded_post_ids: vec![],
409                        encoded_post_lengths: vec![],
410                        encoded_post_healed: HashSet::new(),
411                        peer: peer_self.clone(),
412                        signature: Signature::zero(),
413                    };
414                    header.signature_generate(&self.server_id.keys.signature_key)?;
415
416                    let encoded_post_bundle = EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() };
417                    encoded_post_bundle_bytes = Some(encoded_post_bundle.to_bytes()?);
418                }
419
420                encoded_post_bundle_bytes
421            }
422            false => None,
423        };
424
425        let cache_result = self.post_bundle_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
426
427        let get_post_bundle_response = GetPostBundleResponseV1 {
428            peers_nearer,
429            cache_request_token: cache_result.cache_request_token,
430            post_bundles_cached: cache_result.cached_items,
431            post_bundle,
432        };
433        Ok((PayloadResponseKind::GetPostBundleResponseV1, get_post_bundle_response.to_bytes_gatherer()?))
434    }
435
436    #[allow(non_snake_case)]
437    async fn dispatch_network_payload_x_GetPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
438        anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleFeedbackV1, &payload_request_kind);
439
440        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
441
442        let request = json::bytes_to_struct::<GetPostBundleFeedbackV1>(&bytes)?;
443        trace!("received GetPostBundleFeedbackV1");
444
445        // Store the provided Peers
446        {
447            for peer in request.peers_visited {
448                self.add_potential_peer_to_kademlia(peer, time_millis).await;
449            }
450        }
451
452        // Check our own records to see that we are close enough to store this post
453        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
454
455        let mut post_bundle_encoded_feedbacks_bytes: Option<Bytes> = None;
456
457        if among_peers_nearer {
458            // We only support feedbacks if we know about this post_bundle
459            let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
460            if post_bundle_metadata.is_some() {
461                post_bundle_encoded_feedbacks_bytes = Some(self.environment.get_post_bundle_encoded_post_feedbacks_bytes(time_millis, &request.bucket_location.location_id)?);
462            }
463        }
464
465        // Wrap the feedbacks with a header (if we have any)
466        let peer_self = self.peer_self.read().clone();
467        let encoded_post_bundle_feedback = match post_bundle_encoded_feedbacks_bytes {
468            Some(feedbacks_bytes) => {
469
470                let feedbacks_bytes_hash = hashing::hash(feedbacks_bytes.as_ref());
471
472                let mut header = EncodedPostBundleFeedbackHeaderV1 {
473                    time_millis,
474                    location_id: request.bucket_location.location_id,
475                    feedbacks_bytes_hash,
476                    peer: peer_self.clone(),
477                    signature: Signature::zero(),
478                };
479                header.signature_generate(&self.server_id.keys.signature_key);
480
481                let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1 {
482                    header,
483                    feedbacks_bytes,
484                };
485                Some(encoded_post_bundle_feedback.to_bytes()?)
486            }
487            None => None,
488        };
489
490        let cache_result = self.post_bundle_feedback_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
491
492        let get_post_bundle_feedback_response = GetPostBundleFeedbackResponseV1 {
493            peers_nearer,
494            cache_request_token: cache_result.cache_request_token,
495            post_bundle_feedbacks_cached: cache_result.cached_items,
496            encoded_post_bundle_feedback,
497        };
498        Ok((PayloadResponseKind::GetPostBundleFeedbackResponseV1, get_post_bundle_feedback_response.to_bytes_gatherer()?))
499    }
500
501    #[allow(non_snake_case)]
502    async fn dispatch_network_payload_x_SubmitPostClaimV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
503        anyhow_assert_eq!(&PayloadRequestKind::SubmitPostClaimV1, &payload_request_kind);
504
505        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
506
507        let pow = match pow {
508            Some(pow) => pow,
509            None => anyhow::bail!("We need pow for a submit post claim"),
510        };
511
512        let request = SubmitPostClaimV1::from_bytes(&mut bytes)?;
513        trace!("received SubmitPostClaimV1");
514
515        // Check that the location_id makes sense
516        request.bucket_location.validate()?;
517
518        // Check that we support the bucket duration
519        let bucket_duration = {
520            let bucket_duration = BUCKET_DURATIONS.iter().find(|bucket_duration| **bucket_duration == request.bucket_location.duration);
521            match bucket_duration {
522                Some(bucket_duration) => *bucket_duration,
523                None => anyhow::bail!("Unrecognised bucket duration provided"),
524            }
525        };
526
527        let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes, &request.bucket_location.base_id, false, false)?;
528
529        // Check that enough pow has been done for this post
530        {
531            let pow_minimum = get_minimum_post_pow(decoded_post.header.post_length, decoded_post.header.linked_base_ids.len(), request.bucket_location.duration);
532            if pow.pow < pow_minimum {
533                anyhow::bail!("Insufficient proof of work for this post: actual={} < expected={}", pow.pow, pow_minimum);
534            }
535        }
536
537        // Check that the post matches the bucket
538        {
539            // Ensure that the bucket timestamp fits the post
540            let timestamp = BucketLocation::round_down_to_bucket_start(decoded_post.header.time_millis, bucket_duration);
541            if timestamp != request.bucket_location.bucket_time_millis {
542                anyhow::bail!("The post timestamp does not match the bucket");
543            }
544        }
545
546        let client_id = decoded_post.header.client_id()?;
547
548        // Ensure that the base_id is related to the post in the linked_base_ids.
549        if !decoded_post.header.linked_base_ids.contains(&request.bucket_location.base_id) {
550            anyhow::bail!("The base_id is not related to the post");
551        }
552
553        // Check that only the posting user is allowed to post to a bucket of type USER
554        if request.bucket_location.bucket_type == BucketType::User && request.bucket_location.base_id != client_id.id {
555            anyhow::bail!("Only the posting user is allowed to post to a bucket of type USER");
556        }
557
558        // For ReplyToPost and Sequel buckets, verify the referenced post is real (valid signature).
559        // For Sequel buckets, additionally verify same-author.
560        if matches!(request.bucket_location.bucket_type, BucketType::ReplyToPost | BucketType::Sequel) {
561            let original_header_bytes = request.referenced_post_header_bytes
562                .ok_or_else(|| anyhow::anyhow!("{:?} posts require the original post's header bytes", request.bucket_location.bucket_type))?;
563
564            // Decode the original post header using the submitter's client_id as the decryption password.
565            // decode_from_bytes verifies the signature — a forged header will fail here.
566            let original_post = EncodedPostV1::decode_from_bytes(original_header_bytes, &client_id.id, false, false)?;
567
568            // Verify the original post's post_id matches the bucket's base_id
569            if original_post.post_id != request.bucket_location.base_id {
570                anyhow::bail!("Referenced post header's post_id does not match the bucket's base_id");
571            }
572
573            // For Sequel buckets, additionally verify the sequel author matches the original post author
574            if request.bucket_location.bucket_type == BucketType::Sequel {
575                let original_client_id = original_post.header.client_id()?;
576                if original_client_id != client_id {
577                    anyhow::bail!("Sequel post author does not match original post author");
578                }
579            }
580        }
581
582        // Check that the post timestamp is reasonable
583        {
584            let delta = (time_millis - decoded_post.header.time_millis).abs();
585            if delta > config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD {
586                anyhow::bail!("The post timestamp delta is too large ({} > {})", delta, config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD);
587            }
588        }
589
590        // Check our own records to see that we are close enough to store this post
591        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
592
593        let submit_post_claim_token = match among_peers_nearer {
594            true => {
595                // Are we still willing to accept this post?
596                let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
597                let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
598                let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
599
600                // If it is not yet sealed, update our metadata
601                if !post_bundle_metadata.sealed {
602                    post_bundle_metadata.num_posts_granted += 1;
603                    post_bundle_metadata.overflowed = post_bundle_metadata.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
604                    post_bundle_metadata.sealed = post_bundle_metadata.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
605
606                    self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
607                }
608
609                // We grant the token if we are not yet sealed
610                match post_bundle_metadata.sealed {
611                    false => {
612                        info!("Granted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
613                        Some(SubmitPostClaimTokenV1::new(self.peer_self.read().clone(), request.bucket_location.clone(), decoded_post.post_id, &self.server_id.keys.signature_key))
614                    }
615                    true => {
616                        info!(
617                            "Not granting SubmitPostClaimTokenV1 to {} as we have num_posts={} num_posts_granted={}",
618                            request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted
619                        );
620                        None
621                    }
622                }
623            }
624
625            false => None,
626        };
627
628        // If we are willing to accept this post, then we are willing to track the trendiness of its hashtags
629        if submit_post_claim_token.is_some() {
630            // Track referenced hashtags for trending (only for User bucket posts, where the hashtags originate)
631            if request.bucket_location.bucket_type == BucketType::User && !request.referenced_hashtags.is_empty() {
632                let author_verification_key_bytes = &decoded_post.header.verification_key_bytes;
633                for referenced_hashtag in &request.referenced_hashtags {
634                    let hashtag_id = match Id::from_hashtag_str(referenced_hashtag) {
635                        Ok(id) => id,
636                        Err(_) => continue, // Skip invalid hashtags silently
637                    };
638                    if !decoded_post.header.linked_base_ids.contains(&hashtag_id) {
639                        continue; // Hashtag not backed by a linked_base_id (no PoW), skip it
640                    }
641                    let mut hll = self.trending_hashtags.get(referenced_hashtag).unwrap_or_else(HyperLogLog::new);
642                    hll.insert(author_verification_key_bytes.as_ref());
643                    self.trending_hashtags.insert(referenced_hashtag.clone(), hll);
644                }
645            }
646        }
647
648        let response = SubmitPostClaimResponseV1 { peers_nearer, submit_post_claim_token };
649        Ok((PayloadResponseKind::SubmitPostClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
650    }
651
652    #[allow(non_snake_case)]
653    async fn dispatch_network_payload_x_SubmitPostCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
654        anyhow_assert_eq!(&PayloadRequestKind::SubmitPostCommitV1, &payload_request_kind);
655
656        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
657
658        let request = SubmitPostCommitV1::from_bytes(&mut bytes)?;
659        trace!("received SubmitPostCommitV1");
660
661        let peer_self = self.peer_self.read(); // Remember alphabetical locking order!
662
663        // Is the submit_post_claim_token from us?
664        if request.submit_post_claim_token.peer.id != peer_self.id {
665            anyhow::bail!("The submit_post_claim_token is not from us");
666        }
667
668        // Check that the location_id still makes sense
669        request.bucket_location.validate()?;
670        if request.bucket_location != request.submit_post_claim_token.bucket_location {
671            anyhow::bail!("The location_id in the SubmitPostCommit does not match the bucket_location in the SubmitPostClaimToken");
672        }
673
674        // Check that we can decode the post with the bucket's base_id as password
675        let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes.clone(), &request.bucket_location.base_id, true, false)?;
676
677        // Check that the committed post matches what was claimed
678        if decoded_post.post_id != request.submit_post_claim_token.post_id {
679            anyhow::bail!("The post_id of the committed post does not match the post_id in the SubmitPostClaimToken");
680        }
681
682        // Update the postbundle and metadata
683        let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
684        let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
685        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
686
687        // We should always have the metadata, but just in case!
688        let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
689
690        // Make a PostBundle if we dont have one on disk
691        let mut post_bundle = match post_bundle_bytes {
692            Some(bytes) => {
693                let bytes = Bytes::from_owner(bytes);
694                let bundle = EncodedPostBundleV1::from_bytes(bytes, true)?;
695                bundle
696            }
697            None => {
698                let header = EncodedPostBundleHeaderV1 {
699                    time_millis: TimeMillis::zero(),
700                    location_id: request.bucket_location.location_id,
701                    overflowed: false,
702                    sealed: false,
703                    num_posts: 0,
704                    encoded_post_ids: vec![],
705                    encoded_post_lengths: vec![],
706                    encoded_post_healed: HashSet::new(),
707                    peer: self.peer_self.read().clone(),
708                    signature: Signature::zero(),
709                };
710
711                EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() }
712            }
713        };
714
715        // Sanity check that we don't already have this post
716        if post_bundle.header.encoded_post_ids.contains(&decoded_post.post_id) {
717            anyhow::bail!("Post {} is already in the bundle", decoded_post.post_id);
718        }
719
720        // The post bundle
721        post_bundle.header.time_millis = time_millis;
722        post_bundle.header.num_posts += 1;
723        post_bundle.header.overflowed = post_bundle.header.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
724        post_bundle.header.sealed = post_bundle.header.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
725        post_bundle.header.encoded_post_ids.push(decoded_post.post_id);
726        post_bundle.header.encoded_post_lengths.push(request.encoded_post_bytes.len());
727        post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
728        let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
729        posts_mut.extend_from_slice(request.encoded_post_bytes.as_ref());
730        post_bundle.encoded_posts_bytes = posts_mut.freeze();
731        let post_bundle_bytes_new = post_bundle.to_bytes()?;
732
733        // The metadata
734        post_bundle_metadata.num_posts = post_bundle.header.num_posts;
735        post_bundle_metadata.overflowed = post_bundle.header.overflowed;
736        post_bundle_metadata.sealed = post_bundle.header.sealed;
737        post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
738
739        {
740            self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &post_bundle_bytes_new)?;
741            self.environment
742                .put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, request.encoded_post_bytes.len())?;
743        }
744
745        info!("Persisted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
746
747        let submit_post_commit_token = SubmitPostCommitTokenV1::new(peer_self.clone(), request.bucket_location, decoded_post.post_id, &self.server_id.keys.signature_key);
748
749        let response = SubmitPostCommitResponseV1 { submit_post_commit_token };
750        Ok((PayloadResponseKind::SubmitPostCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
751    }
752
753    #[allow(non_snake_case)]
754    async fn dispatch_network_payload_x_SubmitPostFeedbackV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
755        anyhow_assert_eq!(&PayloadRequestKind::SubmitPostFeedbackV1, &payload_request_kind);
756
757        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
758
759        let request = SubmitPostFeedbackV1::from_bytes(&mut bytes)?;
760        trace!("received SubmitPostFeedbackV1");
761
762        // Enforce minimum PoW for feedback submission
763        let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for SubmitPostFeedbackV1"))?;
764        if pow.pow < config::POW_MINIMUM_PER_FEEDBACK {
765            anyhow::bail!("Insufficient pow for feedback: {} < {}", pow.pow, config::POW_MINIMUM_PER_FEEDBACK);
766        }
767
768        // Check the feedback makes sense
769        request.encoded_post_feedback.pow_verify()?;
770
771        let location_id = request.bucket_location.location_id;
772
773        // Check our own records to see that we are close enough to store this post feedback
774        let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&location_id, config::REDUNDANT_SERVERS_PER_POST);
775
776        let accepted = (|| -> anyhow::Result<bool> {
777            if !among_peers_nearer {
778                return Ok(false);
779            }
780
781            // Do we recognise the post associated with this feedback
782            let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&location_id);
783            let Some(post_bundle_bytes) = self.environment.get_post_bundle_bytes(time_millis, &location_id)?
784            else {
785                return Ok(false);
786            };
787
788            let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
789            if !post_bundle.header.encoded_post_ids.contains(&request.encoded_post_feedback.post_id) {
790                return Ok(false);
791            }
792
793            Ok(true)
794        })()?;
795
796        // Update our environment with the feedback
797        if accepted {
798            trace!("Accepted post feedback for location_id={} encoded_post_feedback={:?}", location_id, request.encoded_post_feedback);
799            self.environment.put_post_feedback_if_more_powerful(time_millis, &location_id, &request.encoded_post_feedback)?;
800        }
801
802        let response = SubmitPostFeedbackResponseV1 { peers_nearer, accepted };
803        Ok((PayloadResponseKind::SubmitPostFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
804    }
805
806    #[allow(non_snake_case)]
807    async fn dispatch_network_payload_x_HealPostBundleClaimV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
808        anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleClaimV1, &payload_request_kind);
809
810        fn generate_negatory_response() -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
811            let response = HealPostBundleClaimResponseV1 { needed_post_ids: vec![], token: None };
812            Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
813        }
814
815        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
816        let request = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
817        trace!("received HealPostBundleClaimV1");
818
819        // Verify the bucket_location is internally consistent and maps to the donor_header's location_id
820        request.bucket_location.validate()?;
821        if request.bucket_location.location_id != request.donor_header.location_id {
822            anyhow::bail!("HealPostBundleClaimV1: bucket_location.location_id does not match donor_header.location_id");
823        }
824
825        // Verify the donor_header provided by the client is self-consistent and properly signed
826        request.donor_header.verify()?;
827
828        // Only heal if we are among the nearest peers for this location
829        let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.donor_header.location_id, config::REDUNDANT_SERVERS_PER_POST);
830        if !among_peers_nearer {
831            return generate_negatory_response();
832        }
833
834        // Reject if a heal for this location is already in progress (another client beat us to it)
835        if self.heal_in_progress.contains_key(&request.donor_header.location_id) {
836            return generate_negatory_response();
837        }
838
839        // Load our current bundle (header only) to see what post_ids we already have
840        let _lock = self.environment.get_read_lock_for_location_id(&request.donor_header.location_id);
841        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.donor_header.location_id)?;
842
843        let our_post_ids: HashSet<Id> = match post_bundle_bytes {
844            Some(bytes) => {
845                let bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(bytes), false)?;
846                bundle.header.encoded_post_ids.into_iter().collect()
847            }
848            None => HashSet::new(),
849        };
850
851        // Posts in the donor_header that we do not yet have, in canonical order
852        let needed_post_ids: Vec<Id> = request.donor_header.encoded_post_ids.iter().filter(|id| !our_post_ids.contains(*id)).copied().collect();
853
854        if needed_post_ids.is_empty() {
855            return generate_negatory_response();
856        }
857
858        self.heal_in_progress.insert(request.donor_header.location_id, ());
859
860        let token = Some(HealPostBundleClaimTokenV1::new(
861            self.peer_self.read().clone(),
862            request.bucket_location,
863            needed_post_ids.clone(),
864            request.donor_header.signature,
865            &self.server_id.keys.signature_key,
866        ));
867        let response = HealPostBundleClaimResponseV1 { needed_post_ids, token };
868        Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
869    }
870
871    #[allow(non_snake_case)]
872    async fn dispatch_network_payload_x_HealPostBundleCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
873        anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleCommitV1, &payload_request_kind);
874
875        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
876        let request = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
877        trace!("received HealPostBundleCommitV1");
878
879        // Verify the token was issued by this server
880        let peer_self = self.peer_self.read().clone();
881        if request.token.peer.id != peer_self.id {
882            anyhow::bail!("HealPostBundleCommitV1: token was not issued by this server");
883        }
884        request.token.verify()?;
885
886        // Verify the donor_header matches what the token was issued for
887        if request.donor_header.signature != request.token.donor_header_signature {
888            anyhow::bail!("HealPostBundleCommitV1: donor_header signature does not match token");
889        }
890        request.donor_header.verify()?;
891
892        if request.token.bucket_location.location_id != request.donor_header.location_id {
893            anyhow::bail!("HealPostBundleCommitV1: token location_id does not match donor_header");
894        }
895
896        let location_id = request.donor_header.location_id;
897
898        // Parse the post bytes supplied by the client (one entry per token.needed_post_id, in order)
899        let mut remaining_bytes = request.encoded_posts_bytes.clone();
900        let mut posts_to_add: Vec<(Id, Bytes)> = Vec::new();
901        for post_id in &request.token.needed_post_ids {
902            let len = request
903                .donor_header
904                .encoded_post_ids
905                .iter()
906                .zip(request.donor_header.encoded_post_lengths.iter())
907                .find(|(id, _)| *id == post_id)
908                .map(|(_, len)| *len)
909                .ok_or_else(|| anyhow::anyhow!("needed_post_id {} not found in donor_header", post_id))?;
910            if remaining_bytes.len() < len {
911                anyhow::bail!("HealPostBundleCommitV1: not enough bytes for post {}", post_id);
912            }
913            let post_bytes = remaining_bytes.split_to(len);
914            posts_to_add.push((*post_id, post_bytes));
915        }
916        if !remaining_bytes.is_empty() {
917            anyhow::bail!("HealPostBundleCommitV1: {} excess bytes", remaining_bytes.len());
918        }
919
920        // Validate each post decrypts successfully with the provided base_id
921        for (post_id, post_bytes) in &posts_to_add {
922            EncodedPostV1::decode_from_bytes(post_bytes.clone(), &request.token.bucket_location.base_id, true, true).map_err(|e| anyhow::anyhow!("HealPostBundleCommitV1: post {} failed decryption: {}", post_id, e))?;
923        }
924
925        // Load our current bundle (or start empty)
926        let _lock = self.environment.get_write_lock_for_location_id(&location_id);
927        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &location_id)?;
928        let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &location_id)?;
929        let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
930
931        let mut post_bundle = match post_bundle_bytes {
932            Some(b) => EncodedPostBundleV1::from_bytes(Bytes::from_owner(b), true)?,
933            None => EncodedPostBundleV1 {
934                header: EncodedPostBundleHeaderV1 {
935                    time_millis: TimeMillis::zero(),
936                    location_id,
937                    overflowed: request.donor_header.overflowed,
938                    sealed: request.donor_header.sealed,
939                    num_posts: 0,
940                    encoded_post_ids: vec![],
941                    encoded_post_lengths: vec![],
942                    encoded_post_healed: HashSet::new(),
943                    peer: peer_self.clone(),
944                    signature: Signature::zero(),
945                },
946                encoded_posts_bytes: Bytes::new(),
947            },
948        };
949
950        let our_post_ids: HashSet<Id> = post_bundle.header.encoded_post_ids.iter().copied().collect();
951
952        let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
953        let mut added_any = false;
954        for (post_id, post_bytes) in posts_to_add {
955            if !our_post_ids.contains(&post_id) {
956                let len = post_bytes.len();
957                posts_mut.extend_from_slice(&post_bytes);
958                post_bundle.header.encoded_post_ids.push(post_id);
959                post_bundle.header.encoded_post_lengths.push(len);
960                post_bundle.header.encoded_post_healed.insert(post_id);
961                added_any = true;
962            }
963        }
964        post_bundle.encoded_posts_bytes = posts_mut.freeze();
965
966        if added_any {
967            post_bundle.header.time_millis = time_millis;
968            post_bundle.header.num_posts = post_bundle.header.encoded_post_ids.len() as u8;
969            post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
970
971            let new_bytes = post_bundle.to_bytes()?;
972            post_bundle_metadata.num_posts = post_bundle.header.num_posts;
973            post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
974
975            self.environment.put_post_bundle_bytes(time_millis, &location_id, &new_bytes)?;
976            self.environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, 0)?;
977
978            info!("Healed {} post(s) for location_id={}", post_bundle.header.encoded_post_healed.len(), location_id);
979        }
980
981        self.heal_in_progress.invalidate(&location_id);
982
983        let response = HealPostBundleCommitResponseV1 { accepted: added_any };
984        Ok((PayloadResponseKind::HealPostBundleCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
985    }
986
987    #[allow(non_snake_case)]
988    async fn dispatch_network_payload_x_HealPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
989        anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleFeedbackV1, &payload_request_kind);
990
991        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
992        let request = HealPostBundleFeedbackV1::from_bytes(&mut bytes)?;
993        trace!("received HealPostBundleFeedbackV1 for location_id={}", request.location_id);
994
995        // Only accept if we are among the nearest peers for this location
996        let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.location_id, config::REDUNDANT_SERVERS_PER_POST);
997        if !among_peers_nearer {
998            let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
999            return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1000        }
1001
1002        // Check we actually have a post bundle for this location (same guard as SubmitPostFeedbackV1)
1003        let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&request.location_id);
1004        let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.location_id)?;
1005        let Some(post_bundle_bytes) = post_bundle_bytes
1006        else {
1007            let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1008            return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1009        };
1010        let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
1011
1012        let mut accepted_count: u32 = 0;
1013        for feedback in &request.encoded_post_feedbacks {
1014            // Only store feedback for posts we actually hold
1015            if !post_bundle.header.encoded_post_ids.contains(&feedback.post_id) {
1016                continue;
1017            }
1018            self.environment.put_post_feedback_if_more_powerful(time_millis, &request.location_id, feedback)?;
1019            accepted_count += 1;
1020        }
1021
1022        if accepted_count > 0 {
1023            trace!("Accepted {} healed feedback(s) for location_id={}", accepted_count, request.location_id);
1024        }
1025
1026        let response = HealPostBundleFeedbackResponseV1 { accepted_count };
1027        Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1028    }
1029
1030    #[allow(non_snake_case)]
1031    async fn dispatch_network_payload_x_CachePostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1032        anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleV1, &payload_request_kind);
1033
1034        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1035        let request = CachePostBundleV1::from_bytes(&mut bytes)?;
1036        trace!("received CachePostBundleV1 for bucket_location={}", request.token.bucket_location);
1037
1038        // Verify token was issued by this server and has not expired
1039        let peer_self = self.peer_self.read().clone();
1040        if request.token.peer.id != peer_self.id {
1041            anyhow::bail!("CachePostBundleV1: token was not issued by this server");
1042        }
1043        request.token.verify()?;
1044        if request.token.is_expired(time_millis) {
1045            anyhow::bail!("CachePostBundleV1: token has expired");
1046        }
1047
1048        let mut any_accepted = false;
1049        for bundle_bytes in request.encoded_post_bundles {
1050            let parse_result: anyhow::Result<()> = try {
1051                let encoded_post_bundle = EncodedPostBundleV1::from_bytes(bundle_bytes.clone(), true)?;
1052
1053                // Check that this proposed cache content is legitimate
1054                encoded_post_bundle.verify(&request.token.bucket_location.base_id)?;
1055
1056                let originator_peer_id = encoded_post_bundle.header.peer.id;
1057                let is_sealed = encoded_post_bundle.header.sealed;
1058                if self.post_bundle_cache.on_upload(request.token.bucket_location.location_id, originator_peer_id, bundle_bytes, time_millis, is_sealed) {
1059                    any_accepted = true;
1060                }
1061            };
1062            if let Err(e) = &parse_result {
1063                warn!("CachePostBundleV1: failed to parse bundle: {}", e);
1064            }
1065        }
1066        let response = CachePostBundleResponseV1 { accepted: any_accepted };
1067        Ok((PayloadResponseKind::CachePostBundleResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1068    }
1069
1070    #[allow(non_snake_case)]
1071    async fn dispatch_network_payload_x_CachePostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1072        anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleFeedbackV1, &payload_request_kind);
1073
1074        let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1075        let request = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
1076        trace!("received CachePostBundleFeedbackV1 for bucket_location={}", request.token.bucket_location);
1077
1078        // Verify token was issued by this server and has not expired
1079        let peer_self = self.peer_self.read().clone();
1080        if request.token.peer.id != peer_self.id {
1081            anyhow::bail!("CachePostBundleFeedbackV1: token was not issued by this server");
1082        }
1083        request.token.verify()?;
1084        if request.token.is_expired(time_millis) {
1085            anyhow::bail!("CachePostBundleFeedbackV1: token has expired");
1086        }
1087
1088        let result: anyhow::Result<bool> = try {
1089            let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(request.encoded_post_bundle_feedback_bytes.clone())?;
1090
1091            encoded_post_bundle_feedback.verify()?;
1092
1093            let originator_peer_id = encoded_post_bundle_feedback.header.peer.id;
1094            // Feedback bundles have no sealed flag — treat as live (5-min TTL)
1095            self.post_bundle_feedback_cache
1096                .on_upload(request.token.bucket_location.location_id, originator_peer_id, request.encoded_post_bundle_feedback_bytes, time_millis, false)
1097        };
1098        let accepted = result.unwrap_or_else(|e| {
1099            warn!("CachePostBundleFeedbackV1: parse error: {}", e);
1100            false
1101        });
1102
1103        let response = CachePostBundleFeedbackResponseV1 { accepted };
1104        Ok((PayloadResponseKind::CachePostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1105    }
1106
1107    #[allow(non_snake_case)]
1108    async fn dispatch_network_payload_x_FetchUrlPreviewV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1109        anyhow_assert_eq!(&PayloadRequestKind::FetchUrlPreviewV1, &payload_request_kind);
1110
1111        let request = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
1112        trace!("received FetchUrlPreviewV1 for url={}", request.url);
1113
1114        // SSRF protection:
1115        // https:// only — blocks plaintext metadata endpoints and forces TLS cert validation.
1116        if !request.url.starts_with("https://") {
1117            anyhow::bail!("FetchUrlPreviewV1 SSRF: only https:// URLs are allowed");
1118        }
1119
1120        // Extract host; reject bare IP literals (IPv4 and IPv6 bracket form).
1121        let host_and_port = request.url["https://".len()..].split(&['/', '?', '#'][..]).next().unwrap_or("");
1122        let host = if host_and_port.starts_with('[') {
1123            // IPv6 literal [addr] or [addr]:port
1124            host_and_port.trim_start_matches('[').split(']').next().unwrap_or("")
1125        } else {
1126            // hostname or IPv4 — strip optional port
1127            host_and_port.split(':').next().unwrap_or(host_and_port)
1128        };
1129        if host.is_empty() {
1130            anyhow::bail!("FetchUrlPreviewV1 SSRF: could not extract host from URL");
1131        }
1132        if host.parse::<std::net::IpAddr>().is_ok() {
1133            anyhow::bail!("FetchUrlPreviewV1 SSRF: bare IP addresses are not allowed");
1134        }
1135
1136        // Resolve once and validate every returned address.  Collecting into a Vec lets us
1137        // re-use the same addresses to pin the reqwest client, closing the DNS-rebinding window
1138        // (nip.io, metadata.google.internal, TTL=0 rebind, etc.).
1139        let resolved_socket_addrs: Vec<std::net::SocketAddr> = tokio::net::lookup_host((host, 443u16))
1140            .await
1141            .map_err(|e| anyhow::anyhow!("FetchUrlPreviewV1 SSRF: DNS resolution failed for {}: {}", host, e))?
1142            .collect();
1143        if resolved_socket_addrs.is_empty() {
1144            anyhow::bail!("FetchUrlPreviewV1 SSRF: DNS returned no addresses for {}", host);
1145        }
1146        for socket_addr in &resolved_socket_addrs {
1147            let ip = socket_addr.ip();
1148            if is_ssrf_protected_ip(ip) {
1149                anyhow::bail!("FetchUrlPreviewV1 SSRF: {} resolved to protected address {}", host, ip);
1150            }
1151        }
1152
1153        // Build a client that:
1154        //   - resolve_to_addrs: skips re-resolution entirely — the validated IPs are used directly
1155        //   - redirect::none: prevents a server-side redirect to an unvalidated internal URL
1156        //   - no_proxy: ignores HTTP_PROXY / NO_PROXY env vars that could route to internal hosts
1157        //   - times out quickly in unreasonable scenarios
1158        //   - limits download size
1159        let http_client = reqwest::Client::builder()
1160            .connect_timeout(std::time::Duration::from_secs(1))
1161            .timeout(std::time::Duration::from_secs(3))
1162            .user_agent("hashiverse-preview/1.0")
1163            .resolve_to_addrs(host, &resolved_socket_addrs)
1164            .redirect(reqwest::redirect::Policy::none())
1165            .no_proxy()
1166            .build()?;
1167
1168        const URL_FETCH_MAX_BODY_BYTES: usize = 512 * 1024;
1169        let mut http_response = http_client.get(&request.url).send().await?;
1170
1171        // Reject early if Content-Length already exceeds the limit — before reading any body bytes.
1172        if let Some(content_length) = http_response.content_length() {
1173            if content_length > URL_FETCH_MAX_BODY_BYTES as u64 {
1174                anyhow::bail!("FetchUrlPreviewV1: Content-Length {} exceeds {} byte limit", content_length, URL_FETCH_MAX_BODY_BYTES);
1175            }
1176        }
1177        let mut body_bytes = BytesMut::new();
1178        while let Some(chunk) = http_response.chunk().await? {
1179            // body_bytes.len() is already within the limit; only the new chunk can overflow.
1180            // We copy only as many bytes as fit rather than bailing, so a single large chunk
1181            // (which reqwest has already buffered) doesn't cause an error — we just truncate.
1182            // This is sufficient for HTML preview extraction which only needs the <head>.
1183            let remaining = URL_FETCH_MAX_BODY_BYTES - body_bytes.len();
1184            body_bytes.extend_from_slice(&chunk[..chunk.len().min(remaining)]);
1185            if body_bytes.len() >= URL_FETCH_MAX_BODY_BYTES {
1186                break;
1187            }
1188        }
1189        let html = String::from_utf8_lossy(&body_bytes).into_owned();
1190
1191        let preview_data = url_preview::extract_url_preview(&html);
1192
1193        let response = FetchUrlPreviewResponseV1 {
1194            url: if preview_data.canonical_url.is_empty() { request.url } else { preview_data.canonical_url },
1195            title: preview_data.title,
1196            description: preview_data.description,
1197            image_url: preview_data.image_url,
1198        };
1199
1200        Ok((PayloadResponseKind::FetchUrlPreviewResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1201    }
1202
1203    #[allow(non_snake_case)]
1204    async fn dispatch_network_payload_x_TrendingHashtagsFetchV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1205        anyhow_assert_eq!(&PayloadRequestKind::TrendingHashtagsFetchV1, &payload_request_kind);
1206
1207        let request = TrendingHashtagsFetchV1::from_bytes(&mut bytes)?;
1208        trace!("received TrendingHashtagsFetchV1 with limit={}", request.limit);
1209
1210        let time_millis = self.runtime_services.time_provider.current_time_millis();
1211
1212        // Check if we have a cached response that is less than a few minutes old
1213        let cached_response = {
1214            let cache = self.trending_hashtags_response_cache.lock();
1215            match cache.as_ref() {
1216                Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(30) => {
1217                    Some(cached_response.clone())
1218                }
1219                _ => None,
1220            }
1221        };
1222
1223        let mut response = match cached_response {
1224            Some(mut cached) => {
1225                cached.trending_hashtags.truncate(request.limit as usize);
1226                cached
1227            }
1228            None => {
1229                // Recalculate: iterate the trending_hashtags cache, sort by unique author count
1230                let mut trending_hashtags: Vec<TrendingHashtagV1> = self.trending_hashtags.iter()
1231                    .map(|(hashtag, hll)| TrendingHashtagV1 {
1232                        hashtag: hashtag.as_ref().clone(),
1233                        count: hll.count(),
1234                    })
1235                    .filter(|entry| entry.count > 0)
1236                    .collect();
1237
1238                trending_hashtags.sort_by(|a, b| b.count.cmp(&a.count));
1239
1240                let full_response = TrendingHashtagsFetchResponseV1 { trending_hashtags };
1241
1242                // Cache the full response (real trending only — fallbacks are applied per-response after truncation)
1243                {
1244                    let mut cache = self.trending_hashtags_response_cache.lock();
1245                    *cache = Some((time_millis, full_response.clone()));
1246                }
1247
1248                let mut truncated_response = full_response;
1249                truncated_response.trending_hashtags.truncate(request.limit as usize);
1250                truncated_response
1251            }
1252        };
1253
1254        top_up_trending_hashtags_with_fallback(&mut response.trending_hashtags, request.limit, TRENDING_HASHTAGS_FALLBACK);
1255
1256        Ok((PayloadResponseKind::TrendingHashtagsFetchResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1257    }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262    use super::*;
1263
1264    fn make_trending_hashtag(hashtag: &str, count: u64) -> TrendingHashtagV1 {
1265        TrendingHashtagV1 { hashtag: hashtag.to_string(), count }
1266    }
1267
1268    #[test]
1269    fn top_up_adds_fallback_when_empty() {
1270        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1271        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &["#hashiverse", "#news"]);
1272        assert_eq!(trending_hashtags.len(), 2);
1273        assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1274        assert_eq!(trending_hashtags[0].count, 0);
1275        assert_eq!(trending_hashtags[1].hashtag, "#news");
1276        assert_eq!(trending_hashtags[1].count, 0);
1277    }
1278
1279    #[test]
1280    fn top_up_respects_limit_smaller_than_fallback_list() {
1281        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1282        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 1, &["#hashiverse", "#news"]);
1283        assert_eq!(trending_hashtags.len(), 1);
1284        assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1285    }
1286
1287    #[test]
1288    fn top_up_preserves_fallback_order() {
1289        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1290        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#first", "#second", "#third"]);
1291        assert_eq!(trending_hashtags[0].hashtag, "#first");
1292        assert_eq!(trending_hashtags[1].hashtag, "#second");
1293        assert_eq!(trending_hashtags[2].hashtag, "#third");
1294    }
1295
1296    #[test]
1297    fn top_up_is_noop_when_already_at_limit() {
1298        let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10), make_trending_hashtag("#golang", 5)];
1299        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 2, &["#hashiverse", "#news"]);
1300        assert_eq!(trending_hashtags.len(), 2);
1301        assert_eq!(trending_hashtags[0].hashtag, "#rust");
1302        assert_eq!(trending_hashtags[1].hashtag, "#golang");
1303    }
1304
1305    #[test]
1306    fn top_up_partially_fills_when_real_trending_exists() {
1307        let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10)];
1308        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1309        assert_eq!(trending_hashtags.len(), 3);
1310        assert_eq!(trending_hashtags[0].hashtag, "#rust");
1311        assert_eq!(trending_hashtags[0].count, 10);
1312        assert_eq!(trending_hashtags[1].hashtag, "#hashiverse");
1313        assert_eq!(trending_hashtags[1].count, 0);
1314        assert_eq!(trending_hashtags[2].hashtag, "#news");
1315        assert_eq!(trending_hashtags[2].count, 0);
1316    }
1317
1318    #[test]
1319    fn top_up_skips_fallback_already_present_exact_match() {
1320        let mut trending_hashtags = vec![make_trending_hashtag("#hashiverse", 42)];
1321        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1322        assert_eq!(trending_hashtags.len(), 2);
1323        assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1324        assert_eq!(trending_hashtags[0].count, 42, "real trending entry must not be overwritten by filler");
1325        assert_eq!(trending_hashtags[1].hashtag, "#news");
1326        assert_eq!(trending_hashtags[1].count, 0);
1327    }
1328
1329    #[test]
1330    fn top_up_dedup_is_case_insensitive_and_prefix_agnostic() {
1331        // Existing entry "HashiVerse" (no `#`, mixed case) should match fallback "#hashiverse"
1332        let mut trending_hashtags = vec![make_trending_hashtag("HashiVerse", 7)];
1333        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1334        assert_eq!(trending_hashtags.len(), 2);
1335        assert_eq!(trending_hashtags[0].hashtag, "HashiVerse");
1336        assert_eq!(trending_hashtags[1].hashtag, "#news");
1337    }
1338
1339    #[test]
1340    fn top_up_with_empty_fallback_is_noop() {
1341        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1342        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &[]);
1343        assert_eq!(trending_hashtags.len(), 0);
1344    }
1345
1346    #[test]
1347    fn top_up_handles_zero_limit() {
1348        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1349        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 0, &["#hashiverse", "#news"]);
1350        assert_eq!(trending_hashtags.len(), 0);
1351    }
1352
1353    #[test]
1354    fn top_up_exhausts_fallback_without_reaching_limit() {
1355        // Limit is larger than what real trending + fallback together can satisfy
1356        let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1357        top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 10, &["#hashiverse", "#news"]);
1358        assert_eq!(trending_hashtags.len(), 2, "should stop at the end of the fallback list, not pad further");
1359    }
1360}
1361