Skip to main content

hashiverse_lib/protocol/posting/
encoded_post_bundle.rs

1//! # `EncodedPostBundleV1` — the server-signed index of posts at a location
2//!
3//! A "post bundle" is a server's durable record of every post accepted into a given
4//! `(location_id, time bucket)` pair. This is the unit of replication, caching, and
5//! healing on the network — timelines fetch bundles, caches store bundles, and the
6//! two-phase heal protocol reconciles divergent bundles.
7//!
8//! The encoding is two parts glued together:
9//!
10//! - [`EncodedPostBundleHeaderV1`] — JSON-encoded, server-signed, enumerating the
11//!   `post_id`s and per-post body lengths the bundle contains, plus
12//!   `sealed`/`overflowed` flags, any `healed_ids`, and the publishing
13//!   [`crate::protocol::peer::Peer`] record. This is the bit that proves "server X
14//!   attests that these posts lived here at time T".
15//! - A concatenation of each [`crate::protocol::posting::encoded_post::EncodedPostV1`]
16//!   body, in the same order as the header's post id list.
17//!
18//! `verify()` is exhaustive: it checks the header signature, confirms the sum of
19//! header-declared lengths exactly spans the body, and decrypts each post to verify
20//! its own signature and that its plaintext `post_id` matches the header's claim.
21
22use std::collections::HashSet;
23use crate::{anyhow_assert_eq, anyhow_assert_ge};
24use crate::protocol::peer::Peer;
25use crate::protocol::posting::encoded_post::EncodedPostV1;
26use crate::tools::time::TimeMillis;
27use crate::tools::types::{Hash, Id, ID_BYTES, Signature, SignatureKey, VerificationKey};
28use crate::tools::{hashing, json, signing};
29use bytes::{Buf, BufMut, Bytes, BytesMut};
30use serde::{Deserialize, Serialize};
31use std::fmt::{Debug, Display};
32
33/// The server-signed index for all posts accumulated under one (location, time-bucket) key.
34///
35/// Posts on hashiverse are not stored one-by-one on the DHT: they are grouped into "bundles"
36/// keyed by a [`crate::tools::buckets::BucketLocation`] (a location_id plus a time bucket).
37/// An `EncodedPostBundleHeaderV1` is the header that a server publishes describing the
38/// bundle's current state — the list of post IDs it holds, their lengths, which ones have
39/// been healed (re-uploaded after loss), whether the bundle has overflowed (too many posts,
40/// go more granular), whether it is sealed (old enough that new posts are unlikely), and the
41/// [`Peer`] record of the server publishing it.
42///
43/// The whole header is signed by the publishing peer, which lets any client reading the
44/// bundle verify both the contents list and the identity of the hosting server before
45/// deciding to fetch individual posts.
46// Contains all the posts for a given location_id in a particular bucket
47#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
48pub struct EncodedPostBundleHeaderV1 {
49    pub time_millis: TimeMillis, // When was this bundle last updated
50    pub location_id: Id,
51    pub overflowed: bool, // This bucket has overflowed, so go more granular
52    pub sealed: bool,    // Enough time has passed that it is unlikely that new posts will be added to this cluster
53    pub num_posts: u8,
54    pub encoded_post_ids: Vec<Id>, 
55    pub encoded_post_lengths: Vec<usize>,
56    pub encoded_post_healed: HashSet<Id>,
57    pub peer: Peer,
58
59    pub signature: Signature, // Peer.sign(get_hash_for_signing())
60}
61
62impl EncodedPostBundleHeaderV1 {
63    pub fn get_hash_for_signing(&self) -> anyhow::Result<Hash> {
64        let time_millis_be = self.time_millis.encode_be();
65        let overflowed_bytes = [self.overflowed as u8];
66        let sealed_bytes = [self.sealed as u8];
67        let num_posts_bytes = [self.num_posts];
68        let encoded_post_lengths_be: Vec<[u8; 8]> = self.encoded_post_lengths.iter().map(|&l| (l as u64).to_be_bytes()).collect();
69        let peer_hash = self.peer.signature_hash_generate()?;
70
71        let mut hash_input: Vec<&[u8]> = vec![];
72
73        hash_input.push(time_millis_be.as_ref());
74        hash_input.push(self.location_id.as_ref());
75        hash_input.push(&overflowed_bytes);
76        hash_input.push(&sealed_bytes);
77        hash_input.push(&num_posts_bytes);
78        for encoded_post_id in &self.encoded_post_ids {
79            hash_input.push(encoded_post_id.as_ref());
80        }
81        for length_be in &encoded_post_lengths_be {
82            hash_input.push(length_be.as_ref());
83        }
84        let mut healed_ids_sorted: Vec<Id> = self.encoded_post_healed.iter().copied().collect();
85        healed_ids_sorted.sort();
86        for healed_id in &healed_ids_sorted {
87            hash_input.push(healed_id.as_ref());
88        }
89        hash_input.push(peer_hash.as_ref());
90
91        Ok(hashing::hash_multiple(&hash_input))
92    }
93
94    pub fn signature_generate(&mut self, signature_key: &SignatureKey) -> anyhow::Result<()> {
95        let hash = self.get_hash_for_signing()?;
96        self.signature = signing::sign(signature_key, hash.as_ref());
97        Ok(())
98    }
99
100    pub fn signature_verify(&self) -> anyhow::Result<()> {
101        let hash = self.get_hash_for_signing()?;
102        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
103        signing::verify(&verification_key, &self.signature, hash.as_ref())
104    }
105
106    pub fn verify(&self) -> anyhow::Result<()> {
107        anyhow_assert_eq!(self.num_posts, self.encoded_post_lengths.len() as u8);
108        anyhow_assert_eq!(self.num_posts, self.encoded_post_ids.len() as u8);
109        for healed_id in &self.encoded_post_healed {
110            if !self.encoded_post_ids.contains(healed_id) {
111                anyhow::bail!("encoded_post_healed contains id not in encoded_post_ids: {}", healed_id);
112            }
113        }
114        self.signature_verify()?;
115        Ok(())
116    }
117}
118
119impl Display for EncodedPostBundleHeaderV1 {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        write!(f, "EncodedPostBundleHeaderV1 [ location_id: {}, time_millis: {}, num_posts: {}, overflowed: {}, sealed: {} ]", self.location_id, self.time_millis, self.num_posts, self.overflowed, self.sealed)
122    }
123}
124
125#[derive(Debug, PartialEq, Clone)]
126pub struct EncodedPostBundleV1 {
127    pub header: EncodedPostBundleHeaderV1,
128    pub encoded_posts_bytes: Bytes,
129}
130
131impl Display for EncodedPostBundleV1 {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        write!(f, "EncodedPostBundleV1 [ header: {}, length: {} ]", self.header, self.encoded_posts_bytes.len())
134    }
135}
136
137impl EncodedPostBundleV1 {
138    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
139        let mut bytes = BytesMut::new();
140
141        let json_post_bundle_header = json::struct_to_bytes(&self.header)?;
142        bytes.put_u8(1u8); // Version
143        bytes.put_u64(json_post_bundle_header.len() as u64);
144        bytes.put_u64(self.encoded_posts_bytes.len() as u64);
145        bytes.put_slice(json_post_bundle_header.as_ref());
146        bytes.put_slice(self.encoded_posts_bytes.as_ref());
147
148        Ok(bytes.freeze())
149    }
150
151    pub fn from_bytes(mut bytes: Bytes, decode_body: bool) -> anyhow::Result<Self> {
152        anyhow_assert_ge!(bytes.remaining(), 1, "Missing version");
153        let version = bytes.get_u8();
154        anyhow_assert_eq!(1, version, "Invalid version");
155
156        anyhow_assert_ge!(bytes.remaining(), 8, "Missing header length");
157        let header_len = bytes.get_u64() as usize;
158        anyhow_assert_ge!(bytes.remaining(), 8, "Missing body length");
159        let body_len = bytes.get_u64() as usize;
160
161        let total_length = header_len.checked_add(body_len).ok_or_else(|| anyhow::anyhow!("header_len + body_len overflow"))?;
162        anyhow_assert_ge!(bytes.remaining(), total_length, "Truncated post bundle data");
163
164        let header_bytes = bytes.copy_to_bytes(header_len);
165        let header = json::bytes_to_struct(&header_bytes)?;
166
167        let body = match decode_body {
168            true => {
169                let body_bytes = bytes.copy_to_bytes(body_len);
170                anyhow_assert_eq!(bytes.remaining(), 0, "Excess data");
171                body_bytes
172            },
173            false => Bytes::new(),
174        };
175
176        Ok(EncodedPostBundleV1 {
177            header,
178            encoded_posts_bytes: body,
179        })
180    }
181
182    /// Verifies that this bundle is legitimate enough to cache.
183    ///
184    /// Checks:
185    /// 1. The bundle header is structurally valid and its signature verifies.
186    /// 2. The sum of `encoded_post_lengths` exactly spans `encoded_posts_bytes`, and
187    ///    the plaintext `post_id` prefix of each slice matches the corresponding entry
188    ///    in `encoded_post_ids`.
189    /// 3. Each post slice can be decrypted using `base_id`, which also verifies the
190    ///    per-post signature and hash integrity.
191    pub fn verify(&self, base_id: &Id) -> anyhow::Result<()> {
192        // (1) Header structure and signature
193        self.header.verify()?;
194
195        // (2) Lengths must exactly span the body
196        let total_length: usize = self.header.encoded_post_lengths.iter().sum();
197        if total_length != self.encoded_posts_bytes.len() {
198            anyhow::bail!(
199                "sum of encoded_post_lengths ({}) != encoded_posts_bytes length ({})",
200                total_length,
201                self.encoded_posts_bytes.len()
202            );
203        }
204
205        // (2) + (3) Per-post checks
206        let mut offset = 0usize;
207        for (i, (&length, expected_post_id)) in self.header.encoded_post_lengths.iter().zip(self.header.encoded_post_ids.iter()).enumerate() {
208            let post_bytes = self.encoded_posts_bytes.slice(offset..offset + length);
209
210            // Plaintext post_id prefix must match the header's claim before we do any crypto
211            if post_bytes.len() < ID_BYTES {
212                anyhow::bail!("post {}: bytes too short to contain post_id", i);
213            }
214            let actual_post_id = Id::from_slice(&post_bytes[..ID_BYTES])?;
215            if actual_post_id != *expected_post_id {
216                anyhow::bail!("post {}: id mismatch — header claims {} but bytes contain {}", i, expected_post_id, actual_post_id);
217            }
218
219            // Decrypt header (expect_body=true, decode_body=false): verifies per-post signature and hashes
220            EncodedPostV1::decode_from_bytes(post_bytes, base_id, true, false)
221                .map_err(|e| anyhow::anyhow!("post {}: failed to verify with base_id: {}", i, e))?;
222
223            offset += length;
224        }
225
226        Ok(())
227    }
228}
229
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use crate::client::key_locker::key_locker::{KeyLocker, KeyLockerManager};
235    use crate::client::key_locker::mem_key_locker::MemKeyLockerManager;
236    use std::sync::Arc;
237    use crate::protocol::posting::encoded_post::EncodedPostV1;
238    use crate::tools::server_id::ServerId;
239    use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
240    use crate::tools::tools;
241    use crate::tools::types::Pow;
242    use crate::tools::parallel_pow_generator::StubParallelPowGenerator;
243
244    /// Builds a valid single-post bundle encrypted under `base_id`.
245    async fn make_valid_bundle(base_id: Id) -> anyhow::Result<EncodedPostBundleV1> {
246        let time_provider = RealTimeProvider::default();
247        let pow_generator = StubParallelPowGenerator::new();
248        let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
249        let peer = server_id.to_peer(&time_provider)?;
250
251        let key_locker_manager = MemKeyLockerManager::new().await?;
252        let key_locker: Arc<dyn KeyLocker> = key_locker_manager.create("test keyphrase".to_string()).await?;
253        let client_id = key_locker.client_id();
254        let timestamp = time_provider.current_time_millis();
255
256        let mut encoded_post = EncodedPostV1::new(client_id, timestamp, vec![base_id], "test post content");
257        let post_bytes_obj = encoded_post.encode_to_bytes_direct(&key_locker).await?;
258        let post_bytes = Bytes::copy_from_slice(post_bytes_obj.bytes());
259
260        let mut header = EncodedPostBundleHeaderV1 {
261            time_millis: timestamp,
262            location_id: Id::random(),
263            overflowed: false,
264            sealed: false,
265            num_posts: 1,
266            encoded_post_ids: vec![encoded_post.post_id],
267            encoded_post_lengths: vec![post_bytes.len()],
268            encoded_post_healed: HashSet::new(),
269            peer,
270            signature: Signature::zero(),
271        };
272        header.signature_generate(&server_id.keys.signature_key)?;
273
274        Ok(EncodedPostBundleV1 { header, encoded_posts_bytes: post_bytes })
275    }
276
277    #[tokio::test]
278    async fn test_verify_valid_bundle() -> anyhow::Result<()> {
279        let base_id = Id::random();
280        let bundle = make_valid_bundle(base_id).await?;
281        bundle.verify(&base_id)
282    }
283
284    #[tokio::test]
285    async fn test_verify_wrong_base_id() -> anyhow::Result<()> {
286        let base_id = Id::random();
287        let bundle = make_valid_bundle(base_id).await?;
288        let wrong_base_id = Id::random();
289        assert!(bundle.verify(&wrong_base_id).is_err());
290        Ok(())
291    }
292
293    #[tokio::test]
294    async fn test_verify_tampered_post_bytes() -> anyhow::Result<()> {
295        let base_id = Id::random();
296        let bundle = make_valid_bundle(base_id).await?;
297        let mut tampered_posts = bundle.encoded_posts_bytes.to_vec();
298        tampered_posts[ID_BYTES + 10] ^= 0xff; // flip a byte inside the encrypted header
299        let tampered_bundle = EncodedPostBundleV1 {
300            header: bundle.header,
301            encoded_posts_bytes: Bytes::from(tampered_posts),
302        };
303        assert!(tampered_bundle.verify(&base_id).is_err());
304        Ok(())
305    }
306
307    #[tokio::test]
308    async fn test_verify_wrong_post_id_in_header() -> anyhow::Result<()> {
309        let base_id = Id::random();
310        let mut bundle = make_valid_bundle(base_id).await?;
311        let pow_generator = StubParallelPowGenerator::new();
312        let server_id = ServerId::new(&RealTimeProvider::default(), Pow(0), true, &pow_generator).await?;
313        bundle.header.encoded_post_ids[0] = Id::random(); // wrong post_id
314        bundle.header.signature_generate(&server_id.keys.signature_key)?;
315        assert!(bundle.verify(&base_id).is_err());
316        Ok(())
317    }
318
319    #[tokio::test]
320    async fn test_verify_wrong_length_sum() -> anyhow::Result<()> {
321        let base_id = Id::random();
322        let mut bundle = make_valid_bundle(base_id).await?;
323        let pow_generator = StubParallelPowGenerator::new();
324        let server_id = ServerId::new(&RealTimeProvider::default(), Pow(0), true, &pow_generator).await?;
325        bundle.header.encoded_post_lengths[0] += 1; // length doesn't match bytes
326        bundle.header.signature_generate(&server_id.keys.signature_key)?;
327        assert!(bundle.verify(&base_id).is_err());
328        Ok(())
329    }
330
331    #[tokio::test]
332    async fn test_verify_tampered_post_length() -> anyhow::Result<()> {
333        // Changing encoded_post_lengths must invalidate the signature even when the
334        // structural checks (sum == body length, num_posts == lengths.len()) still pass.
335        let base_id = Id::random();
336        let bundle = make_valid_bundle(base_id).await?;
337        let original_length = bundle.header.encoded_post_lengths[0];
338        // Pad the body so the sum still matches, then bump the recorded length.
339        let mut tampered_posts = bundle.encoded_posts_bytes.to_vec();
340        tampered_posts.push(0u8); // one extra byte on the body
341        let mut tampered_header = bundle.header.clone();
342        tampered_header.encoded_post_lengths[0] = original_length + 1;
343        // Do NOT re-sign — the signature now covers a different lengths list.
344        let tampered_bundle = EncodedPostBundleV1 {
345            header: tampered_header,
346            encoded_posts_bytes: Bytes::from(tampered_posts),
347        };
348        assert!(tampered_bundle.verify(&base_id).is_err());
349        Ok(())
350    }
351
352    #[tokio::test]
353    async fn test_verify_bad_header_signature() -> anyhow::Result<()> {
354        let base_id = Id::random();
355        let mut bundle = make_valid_bundle(base_id).await?;
356        bundle.header.signature = Signature::zero(); // corrupt the bundle header signature
357        assert!(bundle.verify(&base_id).is_err());
358        Ok(())
359    }
360
361    #[tokio::test]
362    async fn encoded_post_bundle_v1_to_from_bytes_roundtrip() -> anyhow::Result<()> {
363        let time_provider = RealTimeProvider::default();
364        let pow_generator = StubParallelPowGenerator::new();
365        let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
366        let peer = server_id.to_peer(&time_provider)?;
367
368        let num_posts: u8 = 3;
369
370        let mut header = EncodedPostBundleHeaderV1 {
371            time_millis: TimeMillis::random(),
372            location_id: Id::random(),
373            overflowed: true,
374            sealed: false,
375            num_posts,
376            encoded_post_ids: (0..num_posts).map(|_| Id::random()).collect(),
377            encoded_post_lengths: (0..num_posts).map(|_| tools::random_usize_bounded(1024)).collect(),
378            encoded_post_healed: HashSet::new(),
379            peer,
380            signature: Signature::zero(),
381        };
382
383        header.signature_generate(&server_id.keys.signature_key)?;
384        header.verify()?;
385
386        let total_bytes = header.encoded_post_lengths.iter().sum::<usize>();
387        let encoded_posts_bytes = Bytes::from(tools::random_bytes(total_bytes));
388
389        let bundle = EncodedPostBundleV1 {
390            header,
391            encoded_posts_bytes,
392        };
393
394        let bytes1 = bundle.to_bytes()?;
395        let decoded = EncodedPostBundleV1::from_bytes(Bytes::from(bytes1.clone()), true)?;
396
397        assert_eq!(bundle, decoded);
398
399        // Optional extra sanity check: encoding the decoded struct should be stable.
400        let bytes2 = decoded.to_bytes()?;
401        assert_eq!(bytes1, bytes2);
402
403        Ok(())
404    }
405
406    #[tokio::test]
407    async fn encoded_post_bundle_v1_to_from_bytes_roundtrip_without_body() -> anyhow::Result<()> {
408        let time_provider = RealTimeProvider::default();
409        let pow_generator = StubParallelPowGenerator::new();
410        let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
411        let peer = server_id.to_peer(&time_provider)?;
412
413        let num_posts: u8 = 3;
414
415        let mut header = EncodedPostBundleHeaderV1 {
416            time_millis: TimeMillis::random(),
417            location_id: Id::random(),
418            overflowed: true,
419            sealed: false,
420            num_posts,
421            encoded_post_ids: (0..num_posts).map(|_| Id::random()).collect(),
422            encoded_post_lengths: (0..num_posts).map(|_| tools::random_usize_bounded(1024)).collect(),
423            encoded_post_healed: HashSet::new(),
424            peer,
425            signature: Signature::zero(),
426        };
427
428        header.signature_generate(&server_id.keys.signature_key)?;
429        header.verify()?;
430
431        let total_bytes = header.encoded_post_lengths.iter().sum::<usize>();
432        let encoded_posts_bytes = Bytes::from(tools::random_bytes(total_bytes));
433
434        let bundle = EncodedPostBundleV1 {
435            header,
436            encoded_posts_bytes,
437        };
438
439        let bytes1 = bundle.to_bytes()?;
440        let decoded = EncodedPostBundleV1::from_bytes(Bytes::from(bytes1.clone()), false)?;
441
442        assert_eq!(bundle.header, decoded.header);
443        assert!(decoded.encoded_posts_bytes.is_empty());
444
445        Ok(())
446    }
447
448    // ── Robustness tests: EncodedPostBundleV1::from_bytes ──
449
450    #[test]
451    fn test_from_bytes_empty() {
452        assert!(EncodedPostBundleV1::from_bytes(Bytes::new(), true).is_err());
453    }
454
455    #[test]
456    fn test_from_bytes_wrong_version() {
457        assert!(EncodedPostBundleV1::from_bytes(Bytes::from_static(&[99u8]), true).is_err());
458    }
459
460    #[test]
461    fn test_from_bytes_truncated_at_header_length() {
462        // Version byte only, no header length u64
463        assert!(EncodedPostBundleV1::from_bytes(Bytes::from_static(&[1u8]), true).is_err());
464    }
465
466    #[test]
467    fn test_from_bytes_truncated_at_body_length() {
468        // Version + header_len=0, but no body_len u64
469        let mut bytes = BytesMut::new();
470        bytes.put_u8(1); // version
471        bytes.put_u64(0); // header_len
472        assert!(EncodedPostBundleV1::from_bytes(bytes.freeze(), true).is_err());
473    }
474
475    #[test]
476    fn test_from_bytes_header_len_exceeds_remaining() {
477        let mut bytes = BytesMut::new();
478        bytes.put_u8(1); // version
479        bytes.put_u64(99999); // header_len way too large
480        bytes.put_u64(0); // body_len
481        assert!(EncodedPostBundleV1::from_bytes(bytes.freeze(), true).is_err());
482    }
483
484    #[test]
485    fn test_from_bytes_overflow_lengths() {
486        let mut bytes = BytesMut::new();
487        bytes.put_u8(1); // version
488        bytes.put_u64(u64::MAX); // header_len
489        bytes.put_u64(1); // body_len — header_len + body_len overflows usize
490        assert!(EncodedPostBundleV1::from_bytes(bytes.freeze(), true).is_err());
491    }
492
493    #[test]
494    fn test_from_bytes_garbage() {
495        assert!(EncodedPostBundleV1::from_bytes(Bytes::from_static(&[0xff; 128]), true).is_err());
496    }
497
498    #[cfg(not(target_arch = "wasm32"))]
499    mod bolero_fuzz {
500        use bytes::Bytes;
501        use crate::protocol::posting::encoded_post_bundle::EncodedPostBundleV1;
502
503        #[test]
504        fn fuzz_from_bytes() {
505            bolero::check!().for_each(|data: &[u8]| {
506                let _ = EncodedPostBundleV1::from_bytes(Bytes::copy_from_slice(data), true);
507            });
508        }
509
510        #[test]
511        fn fuzz_from_bytes_no_body() {
512            bolero::check!().for_each(|data: &[u8]| {
513                let _ = EncodedPostBundleV1::from_bytes(Bytes::copy_from_slice(data), false);
514            });
515        }
516    }
517}