hashiverse_lib/tools/bytes_gatherer.rs
1//! # Scatter-gather buffer for zero-copy response assembly
2//!
3//! The single export from this module — [`BytesGatherer`] — is how large payloads (post
4//! bundles, feedback bundles) are threaded from storage, through the protocol layer,
5//! through compression, and on to the transport, without ever being memcpy'd by the
6//! application.
7//!
8//! Small POD fields (RPC headers, counters, signatures) accumulate into an internal
9//! `BytesMut` scratchpad; large already-refcounted `Bytes` blobs attach by reference with
10//! [`BytesGatherer::put_bytes`]. At the transport boundary [`BytesGatherer::compact`]
11//! merges adjacent small segments into one and [`BytesGatherer::finish`] yields a
12//! `Vec<Bytes>` suitable for vectored I/O or HTTP/2 DATA frames.
13//!
14//! See the doc comment on [`BytesGatherer`] for the motivating `GetPostBundle` example
15//! and the on-the-wire segment layout.
16
17use bytes::{BufMut, Bytes, BytesMut};
18
19/// A scatter-gather buffer that threads large blobs through the response pipeline by reference,
20/// eliminating application-layer copies for bulk data such as post bundles.
21///
22/// # Motivation
23///
24/// When serving a `GetPostBundleV1` response the post bundle payload (potentially hundreds of KB,
25/// already brotli-compressed on disk) was previously copied multiple times before reaching the network.
26/// Then at the transport boundary the single `Bytes` was handed to HTTPS/TCP with no opportunity
27/// for vectored writes.
28///
29/// `BytesGatherer` fixes this by threading large blobs through the entire response path **by
30/// reference**. Small POD fields (header bytes, JSON metadata) accumulate into an internal
31/// `BytesMut` scratchpad, while large payloads are attached with [`push_bytes`] — a refcount bump
32/// only, no memcpy. At the transport boundary [`compact`] merges any adjacent small segments, sets
33/// `Content-Length`, and HTTP/2 receives a stream of `Bytes` chunks as separate DATA frames.
34///
35/// **Net result for `GetPostBundle`, the hottest-path of Hashiverse: ** multiple application-layer copies of the post bundle → 0. One
36/// kernel copy to the network card remains (unavoidable).
37///
38/// After `compact(1024)` the segment layout for a typical `GetPostBundle` response looks like:
39///
40/// ```text
41/// Segment 0: [rpc_header (~300 B) + response_header (peers/token/counts) (~50 B) + flag (1 B)]
42/// ← merged by compact, one BytesMut flush
43/// Segment 1: post_bundle Bytes (e.g. 500 KB)
44/// ← original Bytes from disk, zero copy
45/// ```
46///
47/// HTTP/2 sends these as 2 DATA frames. TCP concatenates to 1 contiguous buffer via [`to_bytes`].
48///
49/// [`push_bytes`]: BytesGatherer::put_bytes
50/// [`compact`]: BytesGatherer::compact
51/// [`to_bytes`]: BytesGatherer::to_bytes
52#[derive(Default)]
53pub struct BytesGatherer {
54 segments: Vec<Bytes>,
55 current: BytesMut,
56}
57
58impl BytesGatherer {
59 pub fn from_bytes(bytes: Bytes) -> Self {
60 let mut bytes_gatherer = Self::default();
61 bytes_gatherer.put_bytes(bytes);
62 bytes_gatherer
63 }
64
65 // Small POD fields → accumulate into current BytesMut
66 pub fn put_u8(&mut self, v: u8) {
67 self.current.put_u8(v);
68 }
69 pub fn put_u16_le(&mut self, v: u16) {
70 self.current.put_u16_le(v);
71 }
72 pub fn put_u16(&mut self, v: u16) {
73 self.current.put_u16(v);
74 }
75 pub fn put_u32_le(&mut self, v: u32) {
76 self.current.put_u32_le(v);
77 }
78 pub fn put_u32(&mut self, v: u32) {
79 self.current.put_u32(v);
80 }
81 pub fn put_u64(&mut self, v: u64) {
82 self.current.put_u64(v);
83 }
84 pub fn put_slice(&mut self, s: &[u8]) {
85 self.current.put_slice(s);
86 }
87
88 /// Push a large blob — flush accumulator first, store blob by reference (zero copy).
89 pub fn put_bytes(&mut self, bytes: Bytes) {
90 if bytes.is_empty() {
91 return;
92 }
93
94 self.flush();
95 self.segments.push(bytes);
96 }
97
98 /// Merge another gatherer — zero copy for blobs; fast path if both are still assembling.
99 ///
100 /// If `other` has no committed segments yet (all bytes are still in its scratchpad),
101 /// the scratchpad bytes are appended directly into `self.current` — no flush, no new segment.
102 pub fn put_bytes_gatherer(&mut self, mut other: BytesGatherer) {
103 if other.segments.is_empty() {
104 // Fast path: other is still assembling — absorb its scratchpad into ours.
105 if !other.current.is_empty() {
106 self.current.put_slice(&other.current);
107 }
108 return;
109 }
110
111 other.flush();
112 self.flush();
113 self.segments.extend(other.segments);
114 }
115
116 /// Returns `true` if the gatherer contains no bytes.
117 pub fn is_empty(&self) -> bool {
118 self.segments.is_empty() && self.current.is_empty()
119 }
120
121 /// Total byte count across all segments + current accumulator.
122 pub fn len(&self) -> usize {
123 self.segments.iter().map(|b| b.len()).sum::<usize>() + self.current.len()
124 }
125
126 /// Merge adjacent segments smaller than `threshold` bytes into single segments.
127 /// Large segments pass through untouched (zero copy).
128 /// Generally, gall this at the transport boundary before streaming to optimise streaming window fragmentation.
129 pub fn compact(mut self, threshold: usize) -> Self {
130 self.flush();
131 let mut out: Vec<Bytes> = Vec::with_capacity(self.segments.len());
132 let mut acc = BytesMut::new();
133 for seg in self.segments {
134 if seg.len() < threshold {
135 acc.extend_from_slice(&seg);
136 }
137 else {
138 if !acc.is_empty() {
139 out.push(acc.split().freeze());
140 }
141 out.push(seg); // large — zero copy
142 }
143 }
144 if !acc.is_empty() {
145 out.push(acc.freeze());
146 }
147 Self { segments: out, current: BytesMut::new() }
148 }
149
150 /// Consume into `Vec<Bytes>` for streaming / vectored I/O.
151 pub fn finish(mut self) -> Vec<Bytes> {
152 self.flush();
153 self.segments
154 }
155
156 /// Consume into one contiguous Bytes.
157 /// Zero-copy fast path: single segment returned directly.
158 pub fn to_bytes(mut self) -> Bytes {
159 self.flush();
160 if self.segments.len() == 1 {
161 return self.segments.remove(0);
162 }
163 let total = self.segments.iter().map(|b| b.len()).sum();
164 let mut out = BytesMut::with_capacity(total);
165 for seg in self.segments {
166 out.extend_from_slice(&seg);
167 }
168 out.freeze()
169 }
170
171 fn flush(&mut self) {
172 if !self.current.is_empty() {
173 self.segments.push(self.current.split().freeze());
174 }
175 }
176}