Skip to main content

hashiverse_lib/tools/
compression.rs

1//! # Versioned compression helpers
2//!
3//! Every compressed payload produced here is self-describing: the first byte is a
4//! **compression version** that names the algorithm (`0` = passthrough, `1` = lz4,
5//! `2` = brotli). This lets [`decompress`] dispatch on the byte alone, so callers only
6//! need to know which *kind* of compression to aim for, not which algorithm they'll
7//! actually get back.
8//!
9//! Two algorithms are exposed, picked by the caller based on expected access pattern:
10//!
11//! - [`compress_for_speed`] — **lz4-flex**. For RPC wire traffic: the payload is
12//!   compressed once per request and decompressed once per response. Fast, low CPU, modest
13//!   ratio. Falls back to passthrough if the input is below `MIN_BYTES_FOR_LZ4` (~64 B) or
14//!   if the compressed form would be larger than the input.
15//! - [`compress_for_size`] — **brotli q11**. For write-once / read-many post storage on
16//!   servers: spend CPU once at write time in exchange for the smallest bytes-on-disk and
17//!   bytes-on-wire when the post is later served. Same passthrough fallback at
18//!   `MIN_BYTES_FOR_BROTLI` (~128 B).
19//!
20//! ## Defence against decompression bombs
21//!
22//! Both decompress paths cap output at `MAX_DECOMPRESSED_SIZE` (32 MiB, with headroom over
23//! the protocol response limit) so a malicious tiny payload cannot OOM a peer.
24
25use crate::tools::BytesGatherer;
26use bytes::{BufMut, Bytes, BytesMut};
27use std::io::{Read, Write};
28
29/// Version byte prepended to every compressed payload.
30/// This allows the decompressor to evolve the algorithm independently of callers.
31const COMPRESSION_VERSION_NONE: u8 = 0;
32const COMPRESSION_VERSION_LZ4: u8 = 1;
33const COMPRESSION_VERSION_BROTLI: u8 = 2;
34
35/// Below this threshold lz4's frame overhead (~11 bytes) rarely yields a net win.
36const MIN_BYTES_FOR_LZ4: usize = 64;
37
38/// Below this threshold brotli's block overhead rarely yields a net win.
39const MIN_BYTES_FOR_BROTLI: usize = 128;
40
41/// Hard cap on decompressed output size to prevent decompression bombs.
42/// Set to the largest protocol blob limit (response = 16 MB) with headroom.
43const MAX_DECOMPRESSED_SIZE: usize = 32 * 1024 * 1024;
44
45fn compress_passthrough(input: &[u8]) -> anyhow::Result<BytesGatherer> {
46    let mut bytes_gatherer = BytesGatherer::default();
47    bytes_gatherer.put_u8(COMPRESSION_VERSION_NONE);
48    bytes_gatherer.put_slice(input);
49    Ok(bytes_gatherer)
50}
51
52fn decompress_passthrough(input: &[u8]) -> anyhow::Result<BytesGatherer> {
53    Ok(BytesGatherer::from_bytes(Bytes::copy_from_slice(&input[1..])))
54}
55
56fn compress_lz4(input: &[u8]) -> anyhow::Result<BytesGatherer> {
57    if input.len() < MIN_BYTES_FOR_LZ4 {
58        return compress_passthrough(input);
59    }
60
61    // Layout: [version(1)][uncompressed_len u32 le(4)][compressed bytes]
62    // compress_into writes directly into the pre-allocated tail — no intermediate Vec.
63    let max_out = lz4_flex::block::get_maximum_output_size(input.len());
64    let mut result = BytesMut::with_capacity(5 + max_out);
65    result.put_u8(COMPRESSION_VERSION_LZ4);
66    result.put_slice(&(input.len() as u32).to_le_bytes());
67    let data_start = result.len(); // 5
68    result.resize(data_start + max_out, 0);
69    let n = lz4_flex::block::compress_into(input, &mut result[data_start..]).map_err(|e| anyhow::anyhow!("lz4 compression failed: {}", e))?;
70    result.truncate(data_start + n);
71    Ok(BytesGatherer::from_bytes(result.freeze()))
72}
73
74fn decompress_lz4(input: &[u8]) -> anyhow::Result<BytesGatherer> {
75    let data = &input[1..];
76    if data.len() < 4 {
77        anyhow::bail!("lz4 decompression failed: missing size prefix");
78    }
79    let uncompressed_size = u32::from_le_bytes(data[..4].try_into().unwrap()) as usize;
80    if uncompressed_size > MAX_DECOMPRESSED_SIZE {
81        anyhow::bail!("lz4 decompressed size {} exceeds limit {}", uncompressed_size, MAX_DECOMPRESSED_SIZE);
82    }
83    lz4_flex::decompress_size_prepended(data).map(|v| BytesGatherer::from_bytes(Bytes::from(v))).map_err(|e| anyhow::anyhow!("lz4 decompression failed: {}", e))
84}
85
86fn compress_brotli(input: &[u8]) -> anyhow::Result<BytesGatherer> {
87    if input.len() < MIN_BYTES_FOR_BROTLI {
88        return compress_passthrough(input);
89    }
90
91    // Quality 11 (max), lgwin 22 (4 MB window) — optimal for write-once post storage.
92    // CompressorWriter wraps &mut Vec<u8> (impl Write) and appends directly — no intermediate buffer.
93    let mut result = vec![COMPRESSION_VERSION_BROTLI];
94    {
95        let mut writer = brotli::CompressorWriter::new(&mut result, 4096, 11, 22);
96        writer.write_all(input)?;
97    }
98    Ok(BytesGatherer::from_bytes(Bytes::from(result)))
99}
100
101fn decompress_brotli(input: &[u8]) -> anyhow::Result<BytesGatherer> {
102    let mut output = Vec::new();
103    let bytes_read = brotli::Decompressor::new(&input[1..], 4096).take(MAX_DECOMPRESSED_SIZE as u64 + 1).read_to_end(&mut output)?;
104    if bytes_read > MAX_DECOMPRESSED_SIZE {
105        anyhow::bail!("brotli decompressed size {} exceeds limit {}", bytes_read, MAX_DECOMPRESSED_SIZE);
106    }
107    Ok(BytesGatherer::from_bytes(Bytes::from(output)))
108}
109
110/// Compress using lz4 (fast). Use for RPC wire transfer.
111///
112/// Falls back to verbatim (version 0) if:
113/// - input is below the minimum threshold, or
114/// - the compressed form would not be smaller than the original.
115pub fn compress_for_speed(input: &[u8]) -> anyhow::Result<BytesGatherer> {
116    let result = compress_lz4(input)?;
117    if result.len() < input.len() { Ok(result) } else { compress_passthrough(input) }
118}
119
120/// Compress using brotli (small). Use for post storage (compress once, read many).
121///
122/// Falls back to verbatim (version 0) if:
123/// - input is below the minimum threshold, or
124/// - the compressed form would not be smaller than the original.
125pub fn compress_for_size(input: &[u8]) -> anyhow::Result<BytesGatherer> {
126    let result = compress_brotli(input)?;
127    if result.len() < input.len() { Ok(result) } else { compress_passthrough(input) }
128}
129
130/// Decompress any payload produced by [`compress_for_speed`] or [`compress_for_size`].
131///
132/// Dispatches on the leading version byte.
133pub fn decompress(input: &[u8]) -> anyhow::Result<BytesGatherer> {
134    if input.is_empty() {
135        anyhow::bail!("missing compression version byte");
136    }
137    match input[0] {
138        COMPRESSION_VERSION_LZ4 => decompress_lz4(input),
139        COMPRESSION_VERSION_BROTLI => decompress_brotli(input),
140        COMPRESSION_VERSION_NONE => decompress_passthrough(input),
141        v => anyhow::bail!("unsupported compression version byte {}", v),
142    }
143}
144
145
146
147
148#[cfg(test)]
149mod tests {
150    use crate::tools::compression::{compress_for_size, compress_for_speed, decompress};
151    use crate::tools::tools;
152
153    #[cfg(target_arch = "wasm32")]
154    extern crate wasm_bindgen_test;
155    #[cfg(target_arch = "wasm32")]
156    use wasm_bindgen_test::*;
157    #[cfg(target_arch = "wasm32")]
158    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
159
160    fn roundtrip_speed(input: &[u8], msg: &str) -> anyhow::Result<()> {
161        let compressed = compress_for_speed(input)?.to_bytes();
162        let output = decompress(&compressed)?.to_bytes();
163        assert_eq!(input, output.as_ref(), "{}", msg);
164        Ok(())
165    }
166
167    fn roundtrip_size(input: &[u8], msg: &str) -> anyhow::Result<()> {
168        let compressed = compress_for_size(input)?.to_bytes();
169        let output = decompress(&compressed)?.to_bytes();
170        assert_eq!(input, output.as_ref(), "{}", msg);
171        Ok(())
172    }
173
174    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
175    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
176    async fn test_compression_is_reversible() -> anyhow::Result<()> {
177        let input = b"Some example string to test compression and decompression.";
178        roundtrip_speed(input, "lz4 roundtrip")?;
179        roundtrip_size(input, "brotli roundtrip")?;
180        Ok(())
181    }
182
183    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
184    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
185    async fn test_compression_is_reversible_short() -> anyhow::Result<()> {
186        // Below MIN_BYTES_FOR_LZ4 and MIN_BYTES_FOR_BROTLI — both should passthrough
187        let input = b"Some...";
188        roundtrip_speed(input, "lz4 short passthrough")?;
189        roundtrip_size(input, "brotli short passthrough")?;
190        Ok(())
191    }
192
193    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
194    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
195    async fn test_compression_is_reversible_empty() -> anyhow::Result<()> {
196        let input = b"";
197        roundtrip_speed(input, "lz4 empty passthrough")?;
198        roundtrip_size(input, "brotli empty passthrough")?;
199        Ok(())
200    }
201
202    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
203    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
204    async fn test_compression_is_reversible_random() -> anyhow::Result<()> {
205        // Random data is incompressible; both should fall back to passthrough.
206        // Use lz4 path only — brotli quality 11 on 64 MB of random data is too slow for a test.
207        const LENGTH: usize = 8192 * 8192;
208        let input: Vec<u8> = tools::random_bytes(LENGTH);
209        roundtrip_speed(&input, "lz4 random passthrough")?;
210        Ok(())
211    }
212
213    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
214    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
215    async fn test_brotli_actually_compresses_html() -> anyhow::Result<()> {
216        // Brotli should produce a smaller result on compressible text.
217        let input = "<!DOCTYPE html><html><head><title>Test</title></head><body>".repeat(50);
218        let compressed = compress_for_size(input.as_bytes())?.to_bytes();
219        assert!(
220            compressed.len() < input.len(),
221            "brotli should compress repetitive HTML: {} -> {}",
222            input.len(),
223            compressed.len()
224        );
225        let output = decompress(&compressed)?.to_bytes();
226        assert_eq!(input.as_bytes(), output.as_ref());
227        Ok(())
228    }
229
230    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
231    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
232    async fn test_lz4_rejects_oversized_decompressed_payload() {
233        // Craft an lz4 payload with a claimed uncompressed size exceeding the limit.
234        // Layout: [version=1][uncompressed_len u32 le][compressed data...]
235        let fake_size: u32 = (super::MAX_DECOMPRESSED_SIZE as u32) + 1;
236        let mut payload = vec![super::COMPRESSION_VERSION_LZ4];
237        payload.extend_from_slice(&fake_size.to_le_bytes());
238        payload.extend_from_slice(&[0u8; 16]); // garbage compressed data
239        let result = decompress(&payload);
240        let error_message = result.err().expect("should have failed").to_string();
241        assert!(error_message.contains("exceeds limit"), "unexpected error: {}", error_message);
242    }
243
244    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
245    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
246    async fn test_lz4_accepts_valid_decompressed_payload() -> anyhow::Result<()> {
247        // A normal roundtrip should still work fine
248        let input = "hello world! ".repeat(100);
249        let compressed = compress_for_speed(input.as_bytes())?.to_bytes();
250        let output = decompress(&compressed)?.to_bytes();
251        assert_eq!(input.as_bytes(), output.as_ref());
252        Ok(())
253    }
254
255    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
256    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
257    async fn test_lz4_actually_compresses_text() -> anyhow::Result<()> {
258        // lz4 should produce a smaller result on compressible text.
259        let input = "The quick brown fox jumps over the lazy dog. ".repeat(100);
260        let compressed = compress_for_speed(input.as_bytes())?.to_bytes();
261        assert!(
262            compressed.len() < input.len(),
263            "lz4 should compress repetitive text: {} -> {}",
264            input.len(),
265            compressed.len()
266        );
267        let output = decompress(&compressed)?.to_bytes();
268        assert_eq!(input.as_bytes(), output.as_ref());
269        Ok(())
270    }
271}