Skip to main content

hashiverse_lib/client/client_storage/
sqlite_client_storage.rs

1//! # SQLite-backed [`ClientStorage`] implementation
2//!
3//! File-backed persistence using `rusqlite`. One table per
4//! [`crate::client::client_storage::client_storage::BUCKETS`] entry, each with
5//! `(key TEXT PRIMARY KEY, value BLOB, last_accessed INTEGER)`. Opened with WAL mode
6//! (concurrent readers while a writer commits) and `PRAGMA synchronous = NORMAL` — safe
7//! for local cache use, not for power-loss durability.
8//!
9//! A `parking_lot::Mutex` serialises all SQLite access so we never have two goroutines
10//! hitting the same connection at once. This is the store the native server binary and
11//! the desktop reference client use.
12
13use crate::client::client_storage::client_storage::{ClientStorage, BUCKETS};
14use crate::tools::time::TimeMillis;
15use anyhow::Context;
16use rusqlite::Connection;
17use std::path::PathBuf;
18use std::sync::Arc;
19use parking_lot::Mutex;
20
21pub struct SqliteClientStorage {
22    connection: Arc<Mutex<Connection>>,
23}
24
25impl SqliteClientStorage {
26    /// Creates a new SqliteClientStorage at `{data_dir}/client_storage.db`.
27    /// The `data_dir` directory must already exist.
28    pub async fn new(data_dir: PathBuf) -> anyhow::Result<Arc<Self>> {
29        let database_path = data_dir.join("client_storage.db");
30        let connection = Connection::open(&database_path)
31            .with_context(|| format!("Failed to open SQLite database at {}", database_path.display()))?;
32
33        // WAL mode allows concurrent readers and writers without blocking.
34        // NORMAL sync reduces fsync calls for better write performance — safe against
35        // process crashes but not OS/power crashes (acceptable for a local cache).
36        connection.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
37
38        for bucket in BUCKETS {
39            let table_name = sanitize_bucket_name(bucket);
40            connection.execute_batch(&format!(
41                "CREATE TABLE IF NOT EXISTS [{table_name}] (
42                    key TEXT PRIMARY KEY,
43                    value BLOB NOT NULL,
44                    last_accessed INTEGER NOT NULL
45                )"
46            ))?;
47        }
48
49        Ok(Arc::new(Self {
50            connection: Arc::new(Mutex::new(connection)),
51        }))
52    }
53}
54
55fn sanitize_bucket_name(bucket: &str) -> String {
56    bucket.replace(|c: char| !c.is_alphanumeric() && c != '_', "_")
57}
58
59#[async_trait::async_trait]
60impl ClientStorage for SqliteClientStorage {
61    async fn count(&self, bucket: &str) -> anyhow::Result<usize> {
62        let table_name = sanitize_bucket_name(bucket);
63        let connection = self.connection.lock();
64        let count: usize = connection.query_row(
65            &format!("SELECT COUNT(*) FROM [{table_name}]"),
66            [],
67            |row| row.get(0),
68        )?;
69        Ok(count)
70    }
71
72    async fn keys(&self, bucket: &str) -> anyhow::Result<Vec<String>> {
73        let table_name = sanitize_bucket_name(bucket);
74        let connection = self.connection.lock();
75        let mut statement = connection.prepare(&format!("SELECT key FROM [{table_name}]"))?;
76        let keys = statement
77            .query_map([], |row| row.get::<_, String>(0))?
78            .collect::<Result<Vec<_>, _>>()?;
79        Ok(keys)
80    }
81
82    async fn get(&self, bucket: &str, key: &str, time_millis: TimeMillis) -> anyhow::Result<Option<Vec<u8>>> {
83        let table_name = sanitize_bucket_name(bucket);
84        let connection = self.connection.lock();
85
86        let result: Option<Vec<u8>> = connection
87            .query_row(
88                &format!("SELECT value FROM [{table_name}] WHERE key = ?1"),
89                [key],
90                |row| row.get(0),
91            )
92            .optional()?;
93
94        if result.is_some() && time_millis > TimeMillis::zero() {
95            connection.execute(
96                &format!("UPDATE [{table_name}] SET last_accessed = ?1 WHERE key = ?2"),
97                rusqlite::params![time_millis.0, key],
98            )?;
99        }
100
101        Ok(result)
102    }
103
104    async fn put(&self, bucket: &str, key: &str, value: Vec<u8>, time_millis: TimeMillis) -> anyhow::Result<()> {
105        let table_name = sanitize_bucket_name(bucket);
106        let connection = self.connection.lock();
107        connection.execute(
108            &format!("INSERT OR REPLACE INTO [{table_name}] (key, value, last_accessed) VALUES (?1, ?2, ?3)"),
109            rusqlite::params![key, value, time_millis.0],
110        )?;
111        Ok(())
112    }
113
114    async fn remove(&self, bucket: &str, key: &str) -> anyhow::Result<()> {
115        let table_name = sanitize_bucket_name(bucket);
116        let connection = self.connection.lock();
117        connection.execute(
118            &format!("DELETE FROM [{table_name}] WHERE key = ?1"),
119            [key],
120        )?;
121        Ok(())
122    }
123
124    async fn trim(&self, bucket: &str, max_count: usize) -> anyhow::Result<()> {
125        let table_name = sanitize_bucket_name(bucket);
126        let connection = self.connection.lock();
127
128        let count: usize = connection.query_row(
129            &format!("SELECT COUNT(*) FROM [{table_name}]"),
130            [],
131            |row| row.get(0),
132        )?;
133
134        if count > max_count {
135            let num_to_delete = count - max_count;
136            connection.execute(
137                &format!("DELETE FROM [{table_name}] WHERE key IN (SELECT key FROM [{table_name}] ORDER BY last_accessed ASC LIMIT ?1)"),
138                [num_to_delete],
139            )?;
140        }
141
142        Ok(())
143    }
144
145    async fn reset(&self) -> anyhow::Result<()> {
146        let connection = self.connection.lock();
147        for bucket in BUCKETS {
148            let table_name = sanitize_bucket_name(bucket);
149            connection.execute_batch(&format!("DELETE FROM [{table_name}]"))?;
150        }
151        Ok(())
152    }
153}
154
155use rusqlite::OptionalExtension;
156
157#[cfg(test)]
158mod tests {
159    use crate::client::client_storage::client_storage;
160    use crate::client::client_storage::sqlite_client_storage::SqliteClientStorage;
161    use crate::tools::tools::get_temp_dir;
162
163    #[tokio::test]
164    async fn add_test() {
165        let (_temp_dir, temp_dir_path) = get_temp_dir().unwrap();
166        let storage = SqliteClientStorage::new(temp_dir_path.into()).await.unwrap();
167        client_storage::tests::add_test(storage).await;
168    }
169
170    #[tokio::test]
171    async fn trim_test() {
172        let (_temp_dir, temp_dir_path) = get_temp_dir().unwrap();
173        let storage = SqliteClientStorage::new(temp_dir_path.into()).await.unwrap();
174        client_storage::tests::trim_test(storage).await;
175    }
176}