Skip to main content

hashiverse_lib/client/client_storage/
client_storage.rs

1//! # Pluggable client-side persistence
2//!
3//! `ClientStorage` is the abstract key-value store every higher-level client subsystem
4//! writes through: peers ([`crate::client::peer_tracker`]), cached post bundles and
5//! feedback, the meta-post / account config, and so on. The keyspace is partitioned into
6//! named *buckets* (see [`BUCKETS`]), each with its own LRU cap enforced by
7//! [`BUCKET_TRIMS`], so one subsystem cannot evict another's working set.
8//!
9//! Entries are Zstd-compressed and each carries a `last_accessed` timestamp so buckets
10//! can evict their coldest entries on overflow. Passing [`TimeMillis::zero()`] on a read
11//! is a sentinel meaning "don't touch LRU" — used by cache-warming paths so refreshing a
12//! cold cache does not reshuffle the recency ordering.
13//!
14//! Typed convenience helpers on [`ClientStorage`] (`put_struct` / `get_struct` /
15//! `put_str` / `get_str`) handle serde + compression in one step so callers don't
16//! hand-roll the same three lines at each site.
17//!
18//! Three implementations ship with the crate:
19//! - [`crate::client::client_storage::mem_client_storage`] — pure in-memory, tests and
20//!   lightweight deployments.
21//! - [`crate::client::client_storage::sqlite_client_storage`] — on-disk SQLite with WAL,
22//!   native servers and desktop clients.
23//! - (In the WASM client crate) an IndexedDB implementation lives alongside the web client.
24
25use crate::tools::time::TimeMillis;
26use serde::de::DeserializeOwned;
27use serde::Serialize;
28use crate::tools::{compression, json};
29use crate::tools::types::Id;
30
31pub const BUCKET_CONFIG: &str = "config";
32pub const BUCKET_CONFIG_KEY_META_POST_V1_PUBLIC: &str = "meta_post_v1_public";
33pub const BUCKET_CONFIG_KEY_META_POST_V1_PRIVATE: &str = "meta_post_v1_private";
34pub const BUCKET_PEER: &str = "peer";
35pub const BUCKET_META_POST_PUBLIC: &str = "meta_post_public";
36pub const BUCKET_POST_BUNDLE: &str = "post_bundle";
37pub const BUCKET_POST_BUNDLE_FEEDBACK: &str = "post_bundle_feedback";
38pub const BUCKETS: &[&str] = &[BUCKET_CONFIG, BUCKET_PEER, BUCKET_POST_BUNDLE, BUCKET_POST_BUNDLE_FEEDBACK, BUCKET_META_POST_PUBLIC];
39pub const BUCKET_TRIMS: &[usize] = &[0, 1024, 512, 512, 2048];
40
41pub fn config_key_for_user(id: Id, key: &str) -> String {
42    format!("{}.{}", id.to_hex_str(), key)
43}
44
45/// The persistence backend for a [`crate::client::hashiverse_client::HashiverseClient`].
46///
47/// `ClientStorage` is a bucketed, LRU-evictable key/value store: callers name a bucket
48/// (`BUCKET_POST_BUNDLE`, `BUCKET_PEER`, `BUCKET_CONFIG`, …) and operate on byte values
49/// under string keys. The per-entry `time_millis` argument controls LRU freshening; passing
50/// `TimeMillis::zero()` reads an entry without touching its access time, which matters when
51/// the client is warming caches and must not perturb eviction order.
52///
53/// Implementations are swappable per target: the server uses a sled-backed filesystem store,
54/// the browser client uses IndexedDB, tests use an in-memory map. Typed convenience helpers
55/// (`put_struct`, `get_struct`, `put_str`, `get_str`) live alongside the trait; they handle
56/// JSON serialization and Zstd compression so trait implementors only ever see raw bytes.
57#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
58#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
59pub trait ClientStorage: Send + Sync {
60    async fn count(&self, bucket: &str) -> anyhow::Result<usize>;
61    async fn keys(&self, bucket: &str) -> anyhow::Result<Vec<String>>;
62
63    /// Gets an item from the client storage
64    ///
65    /// Note that `time_millis` is used for LRU eviction.  If you pass in TimeMillis::zero(), the record will not be 'touched', and so will not be freshened for the LRU eviction.
66    /// This is useful e.g. for loading all the records into a cache - we don't want to alter their last accessed time...
67    async fn get(&self, bucket: &str, key: &str, time_millis: TimeMillis) -> anyhow::Result<Option<Vec<u8>>>;
68    async fn put(&self, bucket: &str, key: &str, value: Vec<u8>, time_millis: TimeMillis) -> anyhow::Result<()>;
69    async fn remove(&self, bucket: &str, key: &str) -> anyhow::Result<()>;
70    async fn trim(&self, bucket: &str, max_count: usize) -> anyhow::Result<()>;
71    async fn reset(&self) -> anyhow::Result<()>;
72}
73
74/// Extension methods for ClientStorage that provide typed access
75pub async fn put_str(storage: &dyn ClientStorage, bucket: &str, key: &str, value: String, time_millis: TimeMillis) -> anyhow::Result<()> {
76    storage.put(bucket, key, value.into_bytes(), time_millis).await
77}
78
79pub async fn get_str(storage: &dyn ClientStorage, bucket: &str, key: &str, time_millis: TimeMillis) -> anyhow::Result<Option<String>> {
80    let result = storage.get(bucket, key, time_millis).await?;
81    match result {
82        Some(bytes) => Ok(Some(String::from_utf8(bytes)?)),
83        None => Ok(None),
84    }
85}
86
87pub async fn put_struct<T: Serialize>(storage: &dyn ClientStorage, bucket: &str, key: &str, value: &T, time_millis: TimeMillis) -> anyhow::Result<()> {
88    let bytes = json::struct_to_bytes(value)?;
89    let bytes_compressed = compression::compress_for_size(&bytes)?.to_bytes();
90    storage.put(bucket, key, bytes_compressed.to_vec(), time_millis).await
91}
92
93pub async fn get_struct<T: DeserializeOwned>(storage: &dyn ClientStorage, bucket: &str, key: &str, time_millis: TimeMillis) -> anyhow::Result<Option<T>> {
94    let result = storage.get(bucket, key, time_millis).await?;
95    match result {
96        Some(bytes_compressed) => {
97            let bytes = compression::decompress(&bytes_compressed)?.to_bytes();
98            let value = json::bytes_to_struct::<T>(&bytes)?;
99            Ok(Some(value))
100        }
101        None => Ok(None),
102    }
103}
104
105#[cfg(any(test, feature = "generic-tests"))]
106pub mod tests {
107    use super::*;
108    use std::sync::Arc;
109
110    #[test]
111    fn buckets_and_bucket_trims_have_equal_length() {
112        assert_eq!(BUCKETS.len(), BUCKET_TRIMS.len(), "BUCKETS has {} entries but BUCKET_TRIMS has {} entries", BUCKETS.len(), BUCKET_TRIMS.len());
113    }
114
115    pub async fn trim_test(cs: Arc<dyn ClientStorage>) {
116        let result: anyhow::Result<()> = try {
117            cs.reset().await?;
118
119            // Put 5 entries with ascending timestamps (= ascending last_accessed)
120            put_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key1", "val1".to_string(), TimeMillis(10)).await?;
121            put_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key2", "val2".to_string(), TimeMillis(20)).await?;
122            put_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key3", "val3".to_string(), TimeMillis(30)).await?;
123            put_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key4", "val4".to_string(), TimeMillis(40)).await?;
124            put_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key5", "val5".to_string(), TimeMillis(50)).await?;
125            assert_eq!(5, cs.count(BUCKET_POST_BUNDLE).await?);
126
127            // Trim to 3 — key1 (ts=10) and key2 (ts=20) are the oldest, should be removed
128            cs.trim(BUCKET_POST_BUNDLE, 3).await?;
129            assert_eq!(3, cs.count(BUCKET_POST_BUNDLE).await?);
130            assert_eq!(None, get_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key1", TimeMillis::zero()).await?);
131            assert_eq!(None, get_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key2", TimeMillis::zero()).await?);
132            assert_eq!(Some("val3".to_string()), get_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key3", TimeMillis::zero()).await?);
133            assert_eq!(Some("val4".to_string()), get_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key4", TimeMillis::zero()).await?);
134            assert_eq!(Some("val5".to_string()), get_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key5", TimeMillis::zero()).await?);
135
136            // Get key3 with a fresh timestamp — it is now the newest
137            get_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key3", TimeMillis(60)).await?;
138
139            // Trim to 2 — order is now key4 (ts=40), key5 (ts=50), key3 (ts=60); key4 should be removed
140            cs.trim(BUCKET_POST_BUNDLE, 2).await?;
141            assert_eq!(2, cs.count(BUCKET_POST_BUNDLE).await?);
142            assert_eq!(None, get_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key4", TimeMillis::zero()).await?);
143            assert_eq!(Some("val5".to_string()), get_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key5", TimeMillis::zero()).await?);
144            assert_eq!(Some("val3".to_string()), get_str(cs.as_ref(), BUCKET_POST_BUNDLE, "key3", TimeMillis::zero()).await?);
145
146            // Trim to more than count — no-op
147            cs.trim(BUCKET_POST_BUNDLE, 10).await?;
148            assert_eq!(2, cs.count(BUCKET_POST_BUNDLE).await?);
149
150            // Trim to zero
151            cs.trim(BUCKET_POST_BUNDLE, 0).await?;
152            assert_eq!(0, cs.count(BUCKET_POST_BUNDLE).await?);
153
154            cs.reset().await?;
155        };
156
157        if let Err(e) = result {
158            panic!("trim_test failed: {}", e);
159        }
160    }
161
162    pub async fn add_test(client_storage: Arc<dyn ClientStorage>) {
163        let result = try {
164            // Start clean
165            client_storage.reset().await?;
166            assert_eq!(0, client_storage.count(BUCKET_POST_BUNDLE).await?);
167
168            // Test a put
169            {
170                put_str(client_storage.as_ref(), BUCKET_POST_BUNDLE, "key1", "val1".to_string(), TimeMillis(0)).await?;
171                put_str(client_storage.as_ref(), BUCKET_POST_BUNDLE, "key2", "val2".to_string(), TimeMillis(1)).await?;
172                put_str(client_storage.as_ref(), BUCKET_POST_BUNDLE, "key3", "val3".to_string(), TimeMillis(2)).await?;
173            }
174
175            // Test a count
176            assert_eq!(3, client_storage.count(BUCKET_POST_BUNDLE).await?);
177
178            // Test a get
179            {
180                let x = get_str(client_storage.as_ref(), BUCKET_POST_BUNDLE, "key1", TimeMillis(3)).await?;
181                assert_eq!(Some("val1".to_string()), x);
182            }
183
184            // Test an update
185            put_str(client_storage.as_ref(), BUCKET_POST_BUNDLE, "key1", "val4".to_string(), TimeMillis(4)).await?;
186            assert_eq!(3, client_storage.count(BUCKET_POST_BUNDLE).await?);
187            {
188                let x = get_str(client_storage.as_ref(), BUCKET_POST_BUNDLE, "key1", TimeMillis(5)).await?;
189                assert_eq!(Some("val4".to_string()), x);
190            }
191
192            // Test a missing
193            {
194                let x = get_str(client_storage.as_ref(), BUCKET_POST_BUNDLE, "key_none", TimeMillis(6)).await?;
195                assert_eq!(None, x);
196            }
197
198            // Test a delete
199            {
200                client_storage.remove(BUCKET_POST_BUNDLE, "key1").await?;
201                assert_eq!(2, client_storage.count(BUCKET_POST_BUNDLE).await?);
202            }
203
204            // End clean
205            client_storage.reset().await?;
206            assert_eq!(0, client_storage.count(BUCKET_POST_BUNDLE).await?);
207        };
208
209        if let Err(e) = result {
210            panic!("Test failed: {}", e);
211        }
212    }
213}