hashiverse_lib/protocol/posting/
encoded_post_bundle_feedback.rs1use crate::protocol::peer::Peer;
25use crate::protocol::posting::encoded_post_feedback::{EncodedPostFeedbackV1, EncodedPostFeedbackViewV1, ENTRY_SIZE};
26use crate::tools::time::TimeMillis;
27use crate::tools::types::{Hash, Id, Pow, Salt, Signature, SignatureKey, VerificationKey};
28use std::collections::HashMap;
29use crate::tools::{hashing, json, signing};
30use bytes::{Buf, BufMut, Bytes, BytesMut};
31use serde::{Deserialize, Serialize};
32use std::fmt::{Debug, Display};
33use crate::anyhow_assert_eq;
34
35#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
37pub struct EncodedPostBundleFeedbackHeaderV1 {
38 pub time_millis: TimeMillis, pub location_id: Id,
40 pub feedbacks_bytes_hash: Hash, pub peer: Peer,
42
43 pub signature: Signature, }
45
46impl EncodedPostBundleFeedbackHeaderV1 {
47 pub fn get_hash_for_signing(&self) -> Hash {
48 let time_millis_be = self.time_millis.encode_be();
49
50 let mut hash_input: Vec<&[u8]> = vec![];
51
52 hash_input.push(time_millis_be.as_ref());
53 hash_input.push(self.location_id.as_ref());
54 hash_input.push(self.feedbacks_bytes_hash.as_ref());
55 hash_input.push(self.peer.signature.as_ref());
56
57 hashing::hash_multiple(&hash_input)
58 }
59
60 pub fn signature_generate(&mut self, signature_key: &SignatureKey) {
61 let hash = self.get_hash_for_signing();
62 self.signature = signing::sign(signature_key, hash.as_ref());
63 }
64
65 pub fn signature_verify(&self) -> anyhow::Result<()> {
66 let hash = self.get_hash_for_signing();
67 let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
68 signing::verify(&verification_key, &self.signature, hash.as_ref())
69 }
70
71 pub fn verify(&self) -> anyhow::Result<()> {
72 self.signature_verify()?;
73 Ok(())
74 }
75}
76
77impl Display for EncodedPostBundleFeedbackHeaderV1 {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(f, "EncodedPostBundleFeedbackHeaderV1 [ location_id: {}, time_millis: {} ]", self.location_id, self.time_millis)
80 }
81}
82
83#[derive(Debug, PartialEq, Clone)]
84pub struct EncodedPostBundleFeedbackV1 {
85 pub header: EncodedPostBundleFeedbackHeaderV1,
86 pub feedbacks_bytes: Bytes, }
88impl Display for EncodedPostBundleFeedbackV1 {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 write!(f, "EncodedPostBundleV1 [ header: {}, length: {} ]", self.header, self.feedbacks_bytes.len())
91 }
92}
93
94impl EncodedPostBundleFeedbackV1 {
95 pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
96 let mut bytes = BytesMut::new();
97
98 let header = json::struct_to_bytes(&self.header)?;
99 bytes.put_u8(1u8); bytes.put_u64(header.len() as u64);
101 bytes.put_u64(self.feedbacks_bytes.len() as u64);
102 bytes.put_slice(header.as_ref());
103 bytes.put_slice(self.feedbacks_bytes.as_ref());
104
105 Ok(bytes.freeze())
106 }
107
108 pub fn from_bytes(mut bytes: Bytes) -> anyhow::Result<Self> {
109 if bytes.remaining() < 1 {
110 anyhow::bail!("Invalid buffer: missing post_bundle version");
111 }
112 let version = bytes.get_u8();
113 if 1 != version {
114 anyhow::bail!("Invalid buffer: unknown post_bundle version");
115 }
116
117 if bytes.remaining() < 16 {
118 anyhow::bail!("Invalid buffer: missing post_bundle lengths");
119 }
120
121 let header_len = bytes.get_u64() as usize;
122 let body_len = bytes.get_u64() as usize;
123 let total_length = header_len.checked_add(body_len).ok_or_else(|| anyhow::anyhow!("total_length overflow"))?;
124 if bytes.remaining() < total_length {
125 anyhow::bail!("Invalid buffer: post_bundle data truncated");
126 }
127 let header_bytes = bytes.copy_to_bytes(header_len);
128 let body_bytes = bytes.copy_to_bytes(body_len);
129
130 let header = json::bytes_to_struct(&header_bytes)?;
131
132 Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes: body_bytes })
133 }
134
135 pub fn verify(&self) -> anyhow::Result<()> {
143 self.header.verify()?;
145
146 if self.feedbacks_bytes.len() % ENTRY_SIZE != 0 {
148 anyhow::bail!(
149 "feedbacks_bytes length ({}) is not a multiple of ENTRY_SIZE ({})",
150 self.feedbacks_bytes.len(),
151 ENTRY_SIZE
152 );
153 }
154
155 let feedbacks_bytes_hash = hashing::hash(self.feedbacks_bytes.as_ref());
157 anyhow_assert_eq!(feedbacks_bytes_hash, self.header.feedbacks_bytes_hash, "feedbacks_bytes_hash mismatch");
158
159 let num_entries = self.feedbacks_bytes.len() / ENTRY_SIZE;
161 for i in 0..num_entries {
162 let entry_bytes = &self.feedbacks_bytes[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE];
163 let feedback = EncodedPostFeedbackV1::decode_from_bytes(&mut &entry_bytes[..])
164 .map_err(|e| anyhow::anyhow!("feedback {}: invalid entry: {}", i, e))?;
165 feedback.pow_verify()
166 .map_err(|e| anyhow::anyhow!("feedback {}: pow verification failed: {}", i, e))?;
167 }
168
169 Ok(())
170 }
171
172 pub fn get_post_pow_for_feedback_type(&self, post_id: &Id, feedback_type: u8) -> Pow {
173 for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes) {
174 if let Ok(view) = view {
175 if view.feedback_type() == feedback_type && view.post_id_bytes() == post_id.as_ref() {
177 return view.pow();
178 }
179 }
180 }
181 Pow(0)
182 }
183
184 pub fn merge(bundles: &[Self]) -> Option<Self> {
189 let header = bundles.iter().max_by_key(|b| b.header.time_millis)?.header.clone();
190
191 let mut global_max: HashMap<(Id, u8), EncodedPostFeedbackV1> = HashMap::new();
192 for bundle in bundles {
193 for view in EncodedPostFeedbackViewV1::iter(&bundle.feedbacks_bytes) {
194 let Ok(view) = view else { continue };
195 let Ok(post_id) = Id::from_slice(view.post_id_bytes()) else { continue };
196 let key = (post_id, view.feedback_type());
197 let entry = global_max.entry(key).or_insert_with(|| EncodedPostFeedbackV1 {
198 post_id,
199 feedback_type: view.feedback_type(),
200 salt: Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero()),
201 pow: view.pow(),
202 });
203 if view.pow() > entry.pow {
204 entry.salt = Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero());
205 entry.pow = view.pow();
206 }
207 }
208 }
209
210 let mut feedbacks_bytes_mut = Vec::new();
211 for feedback in global_max.values() {
212 let _ = feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut);
213 }
214
215 Some(Self { header, feedbacks_bytes: Bytes::from(feedbacks_bytes_mut) })
216 }
217
218 pub fn get_post_pows(&self, post_id: &Id) -> [Pow; 256] {
219 let mut result = [Pow(0); 256];
220 for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes) {
221 if let Ok(view) = view {
222 if view.post_id_bytes() == post_id.as_ref() {
223 result[view.feedback_type() as usize] = view.pow();
224 }
225 }
226 }
227
228 result
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::tools::server_id::ServerId;
236 use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
237 use crate::tools::pow;
238 use crate::tools::parallel_pow_generator::StubParallelPowGenerator;
239
240 async fn make_valid_bundle() -> anyhow::Result<EncodedPostBundleFeedbackV1> {
242 let time_provider = RealTimeProvider::default();
243 let pow_generator = StubParallelPowGenerator::new();
244 let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
245 let peer = server_id.to_peer(&time_provider)?;
246
247 let post_id = Id::random();
248 let feedback_type = 1u8;
249 let data_hash = pow::pow_compute_data_hash(&[post_id.as_bytes(), &[feedback_type]]);
251 let (salt, achieved_pow, _) = pow::pow_generate_with_iteration_limit(1, Pow(0), &data_hash).await?;
252 let feedback = EncodedPostFeedbackV1::new(post_id, feedback_type, salt, achieved_pow);
253
254 let mut feedbacks_bytes_mut = Vec::new();
255 feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut)?;
256 let feedbacks_bytes = Bytes::from(feedbacks_bytes_mut);
257
258 let mut header = EncodedPostBundleFeedbackHeaderV1 {
259 time_millis: time_provider.current_time_millis(),
260 location_id: Id::random(),
261 feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
262 peer,
263 signature: Signature::zero(),
264 };
265 header.signature_generate(&server_id.keys.signature_key);
266
267 Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes })
268 }
269
270 #[tokio::test]
271 async fn test_verify_valid_bundle() -> anyhow::Result<()> {
272 let bundle = make_valid_bundle().await?;
273 bundle.verify()
274 }
275
276 #[tokio::test]
277 async fn test_verify_bad_header_signature() -> anyhow::Result<()> {
278 let mut bundle = make_valid_bundle().await?;
279 bundle.header.signature = Signature::zero();
280 assert!(bundle.verify().is_err());
281 Ok(())
282 }
283
284 #[tokio::test]
285 async fn test_verify_wrong_feedbacks_hash() -> anyhow::Result<()> {
286 let mut bundle = make_valid_bundle().await?;
287 let pow_generator = StubParallelPowGenerator::new();
288 let server_id = ServerId::new(&RealTimeProvider::default(), Pow(0), true, &pow_generator).await?;
289 bundle.header.feedbacks_bytes_hash = hashing::hash(b"wrong");
290 bundle.header.signature_generate(&server_id.keys.signature_key); assert!(bundle.verify().is_err());
292 Ok(())
293 }
294
295 #[tokio::test]
296 async fn test_verify_partial_entry() -> anyhow::Result<()> {
297 let mut bundle = make_valid_bundle().await?;
298 let pow_generator = StubParallelPowGenerator::new();
299 let server_id = ServerId::new(&RealTimeProvider::default(), Pow(0), true, &pow_generator).await?;
300 let mut bytes = bundle.feedbacks_bytes.to_vec();
302 bytes.push(0u8);
303 bundle.feedbacks_bytes = Bytes::from(bytes);
304 bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
305 bundle.header.signature_generate(&server_id.keys.signature_key);
306 assert!(bundle.verify().is_err());
307 Ok(())
308 }
309
310 #[tokio::test]
311 async fn test_verify_wrong_pow() -> anyhow::Result<()> {
312 let mut bundle = make_valid_bundle().await?;
313 let pow_generator = StubParallelPowGenerator::new();
314 let server_id = ServerId::new(&RealTimeProvider::default(), Pow(0), true, &pow_generator).await?;
315 let mut bytes = bundle.feedbacks_bytes.to_vec();
317 let last = bytes.last_mut().unwrap();
318 *last = last.wrapping_add(1);
319 bundle.feedbacks_bytes = Bytes::from(bytes);
320 bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
321 bundle.header.signature_generate(&server_id.keys.signature_key);
322 assert!(bundle.verify().is_err());
323 Ok(())
324 }
325
326 #[tokio::test]
327 async fn encoded_post_bundle_header_v1_to_from_bytes_roundtrip() -> anyhow::Result<()> {
328 let time_provider = RealTimeProvider::default();
329 let pow_generator = StubParallelPowGenerator::new();
330 let server_id = ServerId::new(&time_provider, Pow(0), true, &pow_generator).await?;
331 let peer = server_id.to_peer(&time_provider)?;
332 let feedbacks_bytes = Bytes::new();
333
334 let mut header = EncodedPostBundleFeedbackHeaderV1 {
335 time_millis: TimeMillis::random(),
336 location_id: Id::random(),
337 feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
338 peer,
339 signature: Signature::zero(),
340 };
341
342 header.signature_generate(&server_id.keys.signature_key);
343 header.verify()?;
344
345 let bundle = EncodedPostBundleFeedbackV1 { header, feedbacks_bytes };
346
347 let bytes1 = bundle.to_bytes()?;
348 let decoded = EncodedPostBundleFeedbackV1::from_bytes(bytes1.clone())?;
349
350 assert_eq!(bundle, decoded);
351
352 let bytes2 = decoded.to_bytes()?;
354 assert_eq!(bytes1, bytes2);
355
356 Ok(())
357 }
358}