Skip to main content

hashiverse_lib/client/client_storage/
mem_client_storage.rs

1//! # In-memory [`ClientStorage`] implementation
2//!
3//! A `HashMap<String, HashMap<String, Entry>>` that implements the
4//! [`crate::client::client_storage::client_storage::ClientStorage`] trait. Each inner map is
5//! one bucket; each `Entry` carries the compressed value and a `last_accessed` timestamp.
6//! When a bucket exceeds its [`crate::client::client_storage::client_storage::BUCKET_TRIMS`]
7//! cap, the oldest-accessed entries are dropped.
8//!
9//! Used by tests (including the integration-test harness) and by any deployment that
10//! explicitly opts out of on-disk persistence.
11
12use crate::client::client_storage::client_storage::{ClientStorage, BUCKETS};
13use crate::tools::time::TimeMillis;
14use anyhow::Context;
15use std::collections::HashMap;
16use std::sync::Arc;
17use parking_lot::RwLock;
18
19struct Entry {
20    value: Vec<u8>,
21    last_accessed: TimeMillis,
22}
23
24pub struct MemClientStorage {
25    buckets: Arc<RwLock<HashMap<String, HashMap<String, Entry>>>>,
26}
27
28impl MemClientStorage {
29    pub async fn new() -> anyhow::Result<Arc<Self>> {
30        let mut buckets = HashMap::new();
31        for bucket in BUCKETS {
32            buckets.insert(bucket.to_string(), HashMap::new());
33        }
34
35        Ok(Arc::new(Self { buckets: Arc::new(RwLock::new(buckets)) }))
36    }
37}
38
39#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
40#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
41impl ClientStorage for MemClientStorage {
42    async fn count(&self, bucket: &str) -> anyhow::Result<usize> {
43        let buckets = self.buckets.read();
44        let bucket = buckets.get(bucket).context(format!("Bucket not found: {}", bucket))?;
45        Ok(bucket.len())
46    }
47
48    async fn keys(&self, bucket: &str) -> anyhow::Result<Vec<String>> {
49        let buckets = self.buckets.read();
50        let bucket = buckets.get(bucket).context(format!("Bucket not found: {}", bucket))?;
51        Ok(bucket.keys().cloned().collect())
52    }
53
54    async fn get(&self, bucket: &str, key: &str, time_millis: TimeMillis) -> anyhow::Result<Option<Vec<u8>>> {
55        let mut buckets = self.buckets.write();
56        let bucket = buckets.get_mut(bucket).context(format!("Bucket not found: {}", bucket))?;
57        match bucket.get_mut(key) {
58            Some(entry) => {
59                if time_millis > TimeMillis::zero() {
60                    entry.last_accessed = time_millis;
61                }
62                Ok(Some(entry.value.clone()))
63            }
64            None => Ok(None),
65        }
66    }
67
68    async fn put(&self, bucket: &str, key: &str, value: Vec<u8>, time_millis: TimeMillis) -> anyhow::Result<()> {
69        let mut buckets = self.buckets.write();
70        let bucket = buckets.get_mut(bucket).context(format!("Bucket not found: {}", bucket))?;
71        bucket.insert(key.to_string(), Entry { value, last_accessed: time_millis });
72        Ok(())
73    }
74
75    async fn remove(&self, bucket: &str, key: &str) -> anyhow::Result<()> {
76        let mut buckets = self.buckets.write();
77        let bucket = buckets.get_mut(bucket).context(format!("Bucket not found: {}", bucket))?;
78        bucket.remove(key);
79        Ok(())
80    }
81
82    async fn trim(&self, bucket: &str, max_count: usize) -> anyhow::Result<()> {
83        let mut buckets = self.buckets.write();
84        let bucket = buckets.get_mut(bucket).context(format!("Bucket not found: {}", bucket))?;
85        if bucket.len() > max_count {
86            let num_to_delete = bucket.len() - max_count;
87            let mut entries: Vec<(&String, TimeMillis)> = bucket.iter().map(|(k, e)| (k, e.last_accessed)).collect();
88            entries.sort_by_key(|(_, t)| *t);
89            let keys_to_delete: Vec<String> = entries.into_iter().take(num_to_delete).map(|(k, _)| k.clone()).collect();
90            for key in keys_to_delete {
91                bucket.remove(&key);
92            }
93        }
94        Ok(())
95    }
96
97    async fn reset(&self) -> anyhow::Result<()> {
98        let mut buckets = self.buckets.write();
99        for bucket in BUCKETS {
100            let bucket = buckets.get_mut(*bucket).context(format!("Bucket not found: {}", bucket))?;
101            bucket.clear();
102        }
103        Ok(())
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use crate::client::client_storage::client_storage;
110    use crate::client::client_storage::mem_client_storage::MemClientStorage;
111
112    #[tokio::test]
113    async fn add_test() {
114        client_storage::tests::add_test(MemClientStorage::new().await.unwrap()).await;
115    }
116
117    #[tokio::test]
118    async fn trim_test() {
119        client_storage::tests::trim_test(MemClientStorage::new().await.unwrap()).await;
120    }
121}