Skip to main content

hashiverse_lib/protocol/posting/
encoded_post_bundle_feedback.rs

1//! # `EncodedPostBundleFeedbackV1` — server-signed aggregate feedback for a bundle
2//!
3//! Feedback (reactions, upvotes/downvotes, flags) on the posts in a single
4//! `(location, time bucket)` is aggregated into its own bundle, parallel to — but
5//! independent of — the [`crate::protocol::posting::encoded_post_bundle`]. The two are
6//! split because feedback can change long after the post bundle itself has sealed, and
7//! many consumers don't need the feedback layer at all.
8//!
9//! Wire format:
10//!
11//! - **Header** (server-signed) — lists every feedback entry's `(post_id,
12//!   feedback_type, salt, pow)` along with a hash of the body. Signed by the
13//!   publishing [`crate::protocol::peer::Peer`].
14//! - **Body** — a packed array of fixed-size
15//!   [`crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1`] entries,
16//!   exactly `N * ENTRY_SIZE` bytes. Fixed sizing means iteration and verification are
17//!   O(N) with no per-entry header overhead.
18//!
19//! `verify()` checks the signature, confirms body length is a multiple of `ENTRY_SIZE`,
20//! re-hashes the body and compares against the header's hash, and checks the
21//! proof-of-work on every entry against the numeraire — Sybil-resistant voting in one
22//! pass.
23
24use crate::protocol::peer::Peer;
25use crate::protocol::posting::encoded_post_feedback::{EncodedPostFeedbackV1, EncodedPostFeedbackViewV1, ENTRY_SIZE};
26use crate::tools::time::TimeMillis;
27use crate::tools::types::{Hash, Id, Pow, Salt, Signature, SignatureKey, VerificationKey};
28use std::collections::HashMap;
29use crate::tools::{hashing, json, signing};
30use bytes::{Buf, BufMut, Bytes, BytesMut};
31use serde::{Deserialize, Serialize};
32use std::fmt::{Debug, Display};
33use crate::anyhow_assert_eq;
34
35// Contains all the posts for a given location_id in a particular bucket
36#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
37pub struct EncodedPostBundleFeedbackHeaderV1 {
38    pub time_millis: TimeMillis, // When was this bundle last updated
39    pub location_id: Id,
40    pub feedbacks_bytes_hash: Hash, // Hash(feedbacks_bytes from the body)
41    pub peer: Peer,
42
43    pub signature: Signature, // Peer.sign(get_hash_for_signing())
44}
45
46impl EncodedPostBundleFeedbackHeaderV1 {
47    pub fn get_hash_for_signing(&self) -> Hash {
48        let time_millis_be = self.time_millis.encode_be();
49
50        let mut hash_input: Vec<&[u8]> = vec![];
51
52        hash_input.push(time_millis_be.as_ref());
53        hash_input.push(self.location_id.as_ref());
54        hash_input.push(self.feedbacks_bytes_hash.as_ref());
55        hash_input.push(self.peer.signature.as_ref());
56
57        hashing::hash_multiple(&hash_input)
58    }
59
60    pub fn signature_generate(&mut self, signature_key: &SignatureKey) {
61        let hash = self.get_hash_for_signing();
62        self.signature = signing::sign(signature_key, hash.as_ref());
63    }
64
65    pub fn signature_verify(&self) -> anyhow::Result<()> {
66        let hash = self.get_hash_for_signing();
67        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
68        signing::verify(&verification_key, &self.signature, hash.as_ref())
69    }
70
71    pub fn verify(&self) -> anyhow::Result<()> {
72        self.signature_verify()?;
73        Ok(())
74    }
75}
76
77impl Display for EncodedPostBundleFeedbackHeaderV1 {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(f, "EncodedPostBundleFeedbackHeaderV1 [ location_id: {}, time_millis: {} ]", self.location_id, self.time_millis)
80    }
81}
82
83#[derive(Debug, PartialEq, Clone)]
84pub struct EncodedPostBundleFeedbackV1 {
85    pub header: EncodedPostBundleFeedbackHeaderV1,
86    pub feedbacks_bytes: Bytes, // A concatenated array of EncodedPostFeedbackV1
87}
88impl Display for EncodedPostBundleFeedbackV1 {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        write!(f, "EncodedPostBundleV1 [ header: {}, length: {} ]", self.header, self.feedbacks_bytes.len())
91    }
92}
93
94impl EncodedPostBundleFeedbackV1 {
95    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
96        let mut bytes = BytesMut::new();
97
98        let header = json::struct_to_bytes(&self.header)?;
99        bytes.put_u8(1u8); // Version
100        bytes.put_u64(header.len() as u64);
101        bytes.put_u64(self.feedbacks_bytes.len() as u64);
102        bytes.put_slice(header.as_ref());
103        bytes.put_slice(self.feedbacks_bytes.as_ref());
104
105        Ok(bytes.freeze())
106    }
107
108    pub fn from_bytes(mut bytes: Bytes) -> anyhow::Result<Self> {
109        if bytes.remaining() < 1 {
110            anyhow::bail!("Invalid buffer: missing post_bundle version");
111        }
112        let version = bytes.get_u8();
113        if 1 != version {
114            anyhow::bail!("Invalid buffer: unknown post_bundle version");
115        }
116
117        if bytes.remaining() < 16 {
118            anyhow::bail!("Invalid buffer: missing post_bundle lengths");
119        }
120
121        let header_len = bytes.get_u64() as usize;
122        let body_len = bytes.get_u64() as usize;
123        let total_length = header_len.checked_add(body_len).ok_or_else(|| anyhow::anyhow!("total_length overflow"))?;
124        if bytes.remaining() < total_length {
125            anyhow::bail!("Invalid buffer: post_bundle data truncated");
126        }
127        let header_bytes = bytes.copy_to_bytes(header_len);
128        let body_bytes = bytes.copy_to_bytes(body_len);
129
130        let header = json::bytes_to_struct(&header_bytes)?;
131
132        Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes: body_bytes })
133    }
134
135    /// Verifies that this bundle is legitimate enough to cache.
136    ///
137    /// Checks:
138    /// 1. The bundle header is valid and its signature verifies.
139    /// 2. The feedback body is a valid multiple of `ENTRY_SIZE` (no partial entries).
140    /// 3. The header's hash is the same as the body's
141    /// 4. Each feedback entry's proof-of-work is correct.
142    pub fn verify(&self) -> anyhow::Result<()> {
143        // (1) Header signature
144        self.header.verify()?;
145
146        // (2) Body must be an exact multiple of the fixed entry size
147        if self.feedbacks_bytes.len() % ENTRY_SIZE != 0 {
148            anyhow::bail!(
149                "feedbacks_bytes length ({}) is not a multiple of ENTRY_SIZE ({})",
150                self.feedbacks_bytes.len(),
151                ENTRY_SIZE
152            );
153        }
154
155        // (3) Hash
156        let feedbacks_bytes_hash = hashing::hash(self.feedbacks_bytes.as_ref());
157        anyhow_assert_eq!(feedbacks_bytes_hash, self.header.feedbacks_bytes_hash, "feedbacks_bytes_hash mismatch");
158
159        // (4) PoW check for every entry
160        let num_entries = self.feedbacks_bytes.len() / ENTRY_SIZE;
161        for i in 0..num_entries {
162            let entry_bytes = &self.feedbacks_bytes[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE];
163            let feedback = EncodedPostFeedbackV1::decode_from_bytes(&mut &entry_bytes[..])
164                .map_err(|e| anyhow::anyhow!("feedback {}: invalid entry: {}", i, e))?;
165            feedback.pow_verify()
166                .map_err(|e| anyhow::anyhow!("feedback {}: pow verification failed: {}", i, e))?;
167        }
168
169        Ok(())
170    }
171
172    pub fn get_post_pow_for_feedback_type(&self, post_id: &Id, feedback_type: u8) -> Pow {
173        for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes) {
174            if let Ok(view) = view {
175                // Check feedback_type first as it short circuits and is therefore faster...
176                if view.feedback_type() == feedback_type && view.post_id_bytes() == post_id.as_ref() {
177                    return view.pow();
178                }
179            }
180        }
181        Pow(0)
182    }
183
184    /// Unions an array of bundles by taking the highest-pow entry for each
185    /// `(post_id, feedback_type)` pair across all bundles.  The header is
186    /// taken from the most-recently-timestamped bundle.  Returns `None` if
187    /// `bundles` is empty.
188    pub fn merge(bundles: &[Self]) -> Option<Self> {
189        let header = bundles.iter().max_by_key(|b| b.header.time_millis)?.header.clone();
190
191        let mut global_max: HashMap<(Id, u8), EncodedPostFeedbackV1> = HashMap::new();
192        for bundle in bundles {
193            for view in EncodedPostFeedbackViewV1::iter(&bundle.feedbacks_bytes) {
194                let Ok(view) = view else { continue };
195                let Ok(post_id) = Id::from_slice(view.post_id_bytes()) else { continue };
196                let key = (post_id, view.feedback_type());
197                let entry = global_max.entry(key).or_insert_with(|| EncodedPostFeedbackV1 {
198                    post_id,
199                    feedback_type: view.feedback_type(),
200                    salt: Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero()),
201                    pow: view.pow(),
202                });
203                if view.pow() > entry.pow {
204                    entry.salt = Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero());
205                    entry.pow = view.pow();
206                }
207            }
208        }
209
210        let mut feedbacks_bytes_mut = Vec::new();
211        for feedback in global_max.values() {
212            let _ = feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut);
213        }
214
215        Some(Self { header, feedbacks_bytes: Bytes::from(feedbacks_bytes_mut) })
216    }
217
218    pub fn get_post_pows(&self, post_id: &Id) -> [Pow; 256] {
219        let mut result = [Pow(0); 256];
220        for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes) {
221            if let Ok(view) = view {
222                if  view.post_id_bytes() == post_id.as_ref() {
223                    result[view.feedback_type() as usize] = view.pow();
224                }
225            }
226        }
227
228        result
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::tools::server_id::ServerId;
236    use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
237    use crate::tools::pow;
238    use crate::tools::parallel_pow_generator::StubParallelPowGenerator;
239
240    /// Builds a valid single-entry feedback bundle.
241    async fn make_valid_bundle() -> anyhow::Result<EncodedPostBundleFeedbackV1> {
242        let time_provider = RealTimeProvider::default();
243        let pow_generator = StubParallelPowGenerator::new();
244        let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
245        let peer = server_id.to_peer(&time_provider)?;
246
247        let post_id = Id::random();
248        let feedback_type = 1u8;
249        // One iteration with Pow(0) always succeeds immediately — cheap in tests
250        let data_hash = pow::pow_compute_data_hash(&[post_id.as_bytes(), &[feedback_type]]);
251        let (salt, achieved_pow, _) = pow::pow_generate_with_iteration_limit(1, Pow(0), &data_hash).await?;
252        let feedback = EncodedPostFeedbackV1::new(post_id, feedback_type, salt, achieved_pow);
253
254        let mut feedbacks_bytes_mut = Vec::new();
255        feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut)?;
256        let feedbacks_bytes = Bytes::from(feedbacks_bytes_mut);
257
258        let mut header = EncodedPostBundleFeedbackHeaderV1 {
259            time_millis: time_provider.current_time_millis(),
260            location_id: Id::random(),
261            feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
262            peer,
263            signature: Signature::zero(),
264        };
265        header.signature_generate(&server_id.keys.signature_key);
266
267        Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes })
268    }
269
270    #[tokio::test]
271    async fn test_verify_valid_bundle() -> anyhow::Result<()> {
272        let bundle = make_valid_bundle().await?;
273        bundle.verify()
274    }
275
276    #[tokio::test]
277    async fn test_verify_bad_header_signature() -> anyhow::Result<()> {
278        let mut bundle = make_valid_bundle().await?;
279        bundle.header.signature = Signature::zero();
280        assert!(bundle.verify().is_err());
281        Ok(())
282    }
283
284    #[tokio::test]
285    async fn test_verify_wrong_feedbacks_hash() -> anyhow::Result<()> {
286        let mut bundle = make_valid_bundle().await?;
287        let pow_generator = StubParallelPowGenerator::new();
288        let server_id = ServerId::new(&RealTimeProvider::default(), Pow(0), true, &pow_generator).await?;
289        bundle.header.feedbacks_bytes_hash = hashing::hash(b"wrong");
290        bundle.header.signature_generate(&server_id.keys.signature_key); // re-sign so header sig itself is valid
291        assert!(bundle.verify().is_err());
292        Ok(())
293    }
294
295    #[tokio::test]
296    async fn test_verify_partial_entry() -> anyhow::Result<()> {
297        let mut bundle = make_valid_bundle().await?;
298        let pow_generator = StubParallelPowGenerator::new();
299        let server_id = ServerId::new(&RealTimeProvider::default(), Pow(0), true, &pow_generator).await?;
300        // Append one extra byte to make the length not a multiple of ENTRY_SIZE
301        let mut bytes = bundle.feedbacks_bytes.to_vec();
302        bytes.push(0u8);
303        bundle.feedbacks_bytes = Bytes::from(bytes);
304        bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
305        bundle.header.signature_generate(&server_id.keys.signature_key);
306        assert!(bundle.verify().is_err());
307        Ok(())
308    }
309
310    #[tokio::test]
311    async fn test_verify_wrong_pow() -> anyhow::Result<()> {
312        let mut bundle = make_valid_bundle().await?;
313        let pow_generator = StubParallelPowGenerator::new();
314        let server_id = ServerId::new(&RealTimeProvider::default(), Pow(0), true, &pow_generator).await?;
315        // Flip the last byte of the entry (the pow byte) to an incorrect value
316        let mut bytes = bundle.feedbacks_bytes.to_vec();
317        let last = bytes.last_mut().unwrap();
318        *last = last.wrapping_add(1);
319        bundle.feedbacks_bytes = Bytes::from(bytes);
320        bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
321        bundle.header.signature_generate(&server_id.keys.signature_key);
322        assert!(bundle.verify().is_err());
323        Ok(())
324    }
325
326    #[tokio::test]
327    async fn encoded_post_bundle_header_v1_to_from_bytes_roundtrip() -> anyhow::Result<()> {
328        let time_provider = RealTimeProvider::default();
329        let pow_generator = StubParallelPowGenerator::new();
330        let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
331        let peer = server_id.to_peer(&time_provider)?;
332        let feedbacks_bytes =  Bytes::new();
333
334        let mut header = EncodedPostBundleFeedbackHeaderV1 {
335            time_millis: TimeMillis::random(),
336            location_id: Id::random(),
337            feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
338            peer,
339            signature: Signature::zero(),
340        };
341
342        header.signature_generate(&server_id.keys.signature_key);
343        header.verify()?;
344
345        let bundle = EncodedPostBundleFeedbackV1 { header, feedbacks_bytes };
346
347        let bytes1 = bundle.to_bytes()?;
348        let decoded = EncodedPostBundleFeedbackV1::from_bytes(bytes1.clone())?;
349
350        assert_eq!(bundle, decoded);
351
352        // Optional extra sanity check: encoding the decoded struct should be stable.
353        let bytes2 = decoded.to_bytes()?;
354        assert_eq!(bytes1, bytes2);
355
356        Ok(())
357    }
358}