Skip to main content

hashiverse_lib/tools/
bytes_gatherer.rs

1//! # Scatter-gather buffer for zero-copy response assembly
2//!
3//! The single export from this module — [`BytesGatherer`] — is how large payloads (post
4//! bundles, feedback bundles) are threaded from storage, through the protocol layer,
5//! through compression, and on to the transport, without ever being memcpy'd by the
6//! application.
7//!
8//! Small POD fields (RPC headers, counters, signatures) accumulate into an internal
9//! `BytesMut` scratchpad; large already-refcounted `Bytes` blobs attach by reference with
10//! [`BytesGatherer::put_bytes`]. At the transport boundary [`BytesGatherer::compact`]
11//! merges adjacent small segments into one and [`BytesGatherer::finish`] yields a
12//! `Vec<Bytes>` suitable for vectored I/O or HTTP/2 DATA frames.
13//!
14//! See the doc comment on [`BytesGatherer`] for the motivating `GetPostBundle` example
15//! and the on-the-wire segment layout.
16
17use bytes::{BufMut, Bytes, BytesMut};
18
19/// A scatter-gather buffer that threads large blobs through the response pipeline by reference,
20/// eliminating application-layer copies for bulk data such as post bundles.
21///
22/// # Motivation
23///
24/// When serving a `GetPostBundleV1` response the post bundle payload (potentially hundreds of KB,
25/// already brotli-compressed on disk) was previously copied multiple times before reaching the network.
26/// Then at the transport boundary the single `Bytes` was handed to HTTPS/TCP with no opportunity
27/// for vectored writes.
28///
29/// `BytesGatherer` fixes this by threading large blobs through the entire response path **by
30/// reference**. Small POD fields (header bytes, JSON metadata) accumulate into an internal
31/// `BytesMut` scratchpad, while large payloads are attached with [`push_bytes`] — a refcount bump
32/// only, no memcpy. At the transport boundary [`compact`] merges any adjacent small segments, sets
33/// `Content-Length`, and HTTP/2 receives a stream of `Bytes` chunks as separate DATA frames.
34///
35/// **Net result for `GetPostBundle`, the hottest-path of Hashiverse: ** multiple application-layer copies of the post bundle → 0. One
36/// kernel copy to the network card remains (unavoidable).
37///
38/// After `compact(1024)` the segment layout for a typical `GetPostBundle` response looks like:
39///
40/// ```text
41/// Segment 0: [rpc_header (~300 B) + response_header (peers/token/counts) (~50 B) + flag (1 B)]
42///            ← merged by compact, one BytesMut flush
43/// Segment 1: post_bundle Bytes (e.g. 500 KB)
44///            ← original Bytes from disk, zero copy
45/// ```
46///
47/// HTTP/2 sends these as 2 DATA frames. TCP concatenates to 1 contiguous buffer via [`to_bytes`].
48///
49/// [`push_bytes`]: BytesGatherer::put_bytes
50/// [`compact`]: BytesGatherer::compact
51/// [`to_bytes`]: BytesGatherer::to_bytes
52#[derive(Default)]
53pub struct BytesGatherer {
54    segments: Vec<Bytes>,
55    current: BytesMut,
56}
57
58impl BytesGatherer {
59    pub fn from_bytes(bytes: Bytes) -> Self {
60        let mut bytes_gatherer = Self::default();
61        bytes_gatherer.put_bytes(bytes);
62        bytes_gatherer
63    }
64
65    // Small POD fields → accumulate into current BytesMut
66    pub fn put_u8(&mut self, v: u8) {
67        self.current.put_u8(v);
68    }
69    pub fn put_u16_le(&mut self, v: u16) {
70        self.current.put_u16_le(v);
71    }
72    pub fn put_u16(&mut self, v: u16) {
73        self.current.put_u16(v);
74    }
75    pub fn put_u32_le(&mut self, v: u32) {
76        self.current.put_u32_le(v);
77    }
78    pub fn put_u32(&mut self, v: u32) {
79        self.current.put_u32(v);
80    }
81    pub fn put_u64(&mut self, v: u64) {
82        self.current.put_u64(v);
83    }
84    pub fn put_slice(&mut self, s: &[u8]) {
85        self.current.put_slice(s);
86    }
87
88    /// Push a large blob — flush accumulator first, store blob by reference (zero copy).
89    pub fn put_bytes(&mut self, bytes: Bytes) {
90        if bytes.is_empty() {
91            return;
92        }
93
94        self.flush();
95        self.segments.push(bytes);
96    }
97
98    /// Merge another gatherer — zero copy for blobs; fast path if both are still assembling.
99    ///
100    /// If `other` has no committed segments yet (all bytes are still in its scratchpad),
101    /// the scratchpad bytes are appended directly into `self.current` — no flush, no new segment.
102    pub fn put_bytes_gatherer(&mut self, mut other: BytesGatherer) {
103        if other.segments.is_empty() {
104            // Fast path: other is still assembling — absorb its scratchpad into ours.
105            if !other.current.is_empty() {
106                self.current.put_slice(&other.current);
107            }
108            return;
109        }
110
111        other.flush();
112        self.flush();
113        self.segments.extend(other.segments);
114    }
115
116    /// Returns `true` if the gatherer contains no bytes.
117    pub fn is_empty(&self) -> bool {
118        self.segments.is_empty() && self.current.is_empty()
119    }
120
121    /// Total byte count across all segments + current accumulator.
122    pub fn len(&self) -> usize {
123        self.segments.iter().map(|b| b.len()).sum::<usize>() + self.current.len()
124    }
125
126    /// Merge adjacent segments smaller than `threshold` bytes into single segments.
127    /// Large segments pass through untouched (zero copy).
128    /// Generally, gall this at the transport boundary before streaming to optimise streaming window fragmentation.
129    pub fn compact(mut self, threshold: usize) -> Self {
130        self.flush();
131        let mut out: Vec<Bytes> = Vec::with_capacity(self.segments.len());
132        let mut acc = BytesMut::new();
133        for seg in self.segments {
134            if seg.len() < threshold {
135                acc.extend_from_slice(&seg);
136            }
137            else {
138                if !acc.is_empty() {
139                    out.push(acc.split().freeze());
140                }
141                out.push(seg); // large — zero copy
142            }
143        }
144        if !acc.is_empty() {
145            out.push(acc.freeze());
146        }
147        Self { segments: out, current: BytesMut::new() }
148    }
149
150    /// Consume into `Vec<Bytes>` for streaming / vectored I/O.
151    pub fn finish(mut self) -> Vec<Bytes> {
152        self.flush();
153        self.segments
154    }
155
156    /// Consume into one contiguous Bytes.
157    /// Zero-copy fast path: single segment returned directly.
158    pub fn to_bytes(mut self) -> Bytes {
159        self.flush();
160        if self.segments.len() == 1 {
161            return self.segments.remove(0);
162        }
163        let total = self.segments.iter().map(|b| b.len()).sum();
164        let mut out = BytesMut::with_capacity(total);
165        for seg in self.segments {
166            out.extend_from_slice(&seg);
167        }
168        out.freeze()
169    }
170
171    fn flush(&mut self) {
172        if !self.current.is_empty() {
173            self.segments.push(self.current.split().freeze());
174        }
175    }
176}