hashiverse_lib/client/client_storage/
sqlite_client_storage.rs1use crate::client::client_storage::client_storage::{ClientStorage, BUCKETS};
14use crate::tools::time::TimeMillis;
15use anyhow::Context;
16use rusqlite::Connection;
17use std::path::PathBuf;
18use std::sync::Arc;
19use parking_lot::Mutex;
20
21pub struct SqliteClientStorage {
22 connection: Arc<Mutex<Connection>>,
23}
24
25impl SqliteClientStorage {
26 pub async fn new(data_dir: PathBuf) -> anyhow::Result<Arc<Self>> {
29 let database_path = data_dir.join("client_storage.db");
30 let connection = Connection::open(&database_path)
31 .with_context(|| format!("Failed to open SQLite database at {}", database_path.display()))?;
32
33 connection.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
37
38 for bucket in BUCKETS {
39 let table_name = sanitize_bucket_name(bucket);
40 connection.execute_batch(&format!(
41 "CREATE TABLE IF NOT EXISTS [{table_name}] (
42 key TEXT PRIMARY KEY,
43 value BLOB NOT NULL,
44 last_accessed INTEGER NOT NULL
45 )"
46 ))?;
47 }
48
49 Ok(Arc::new(Self {
50 connection: Arc::new(Mutex::new(connection)),
51 }))
52 }
53}
54
55fn sanitize_bucket_name(bucket: &str) -> String {
56 bucket.replace(|c: char| !c.is_alphanumeric() && c != '_', "_")
57}
58
59#[async_trait::async_trait]
60impl ClientStorage for SqliteClientStorage {
61 async fn count(&self, bucket: &str) -> anyhow::Result<usize> {
62 let table_name = sanitize_bucket_name(bucket);
63 let connection = self.connection.lock();
64 let count: usize = connection.query_row(
65 &format!("SELECT COUNT(*) FROM [{table_name}]"),
66 [],
67 |row| row.get(0),
68 )?;
69 Ok(count)
70 }
71
72 async fn keys(&self, bucket: &str) -> anyhow::Result<Vec<String>> {
73 let table_name = sanitize_bucket_name(bucket);
74 let connection = self.connection.lock();
75 let mut statement = connection.prepare(&format!("SELECT key FROM [{table_name}]"))?;
76 let keys = statement
77 .query_map([], |row| row.get::<_, String>(0))?
78 .collect::<Result<Vec<_>, _>>()?;
79 Ok(keys)
80 }
81
82 async fn get(&self, bucket: &str, key: &str, time_millis: TimeMillis) -> anyhow::Result<Option<Vec<u8>>> {
83 let table_name = sanitize_bucket_name(bucket);
84 let connection = self.connection.lock();
85
86 let result: Option<Vec<u8>> = connection
87 .query_row(
88 &format!("SELECT value FROM [{table_name}] WHERE key = ?1"),
89 [key],
90 |row| row.get(0),
91 )
92 .optional()?;
93
94 if result.is_some() && time_millis > TimeMillis::zero() {
95 connection.execute(
96 &format!("UPDATE [{table_name}] SET last_accessed = ?1 WHERE key = ?2"),
97 rusqlite::params![time_millis.0, key],
98 )?;
99 }
100
101 Ok(result)
102 }
103
104 async fn put(&self, bucket: &str, key: &str, value: Vec<u8>, time_millis: TimeMillis) -> anyhow::Result<()> {
105 let table_name = sanitize_bucket_name(bucket);
106 let connection = self.connection.lock();
107 connection.execute(
108 &format!("INSERT OR REPLACE INTO [{table_name}] (key, value, last_accessed) VALUES (?1, ?2, ?3)"),
109 rusqlite::params![key, value, time_millis.0],
110 )?;
111 Ok(())
112 }
113
114 async fn remove(&self, bucket: &str, key: &str) -> anyhow::Result<()> {
115 let table_name = sanitize_bucket_name(bucket);
116 let connection = self.connection.lock();
117 connection.execute(
118 &format!("DELETE FROM [{table_name}] WHERE key = ?1"),
119 [key],
120 )?;
121 Ok(())
122 }
123
124 async fn trim(&self, bucket: &str, max_count: usize) -> anyhow::Result<()> {
125 let table_name = sanitize_bucket_name(bucket);
126 let connection = self.connection.lock();
127
128 let count: usize = connection.query_row(
129 &format!("SELECT COUNT(*) FROM [{table_name}]"),
130 [],
131 |row| row.get(0),
132 )?;
133
134 if count > max_count {
135 let num_to_delete = count - max_count;
136 connection.execute(
137 &format!("DELETE FROM [{table_name}] WHERE key IN (SELECT key FROM [{table_name}] ORDER BY last_accessed ASC LIMIT ?1)"),
138 [num_to_delete],
139 )?;
140 }
141
142 Ok(())
143 }
144
145 async fn reset(&self) -> anyhow::Result<()> {
146 let connection = self.connection.lock();
147 for bucket in BUCKETS {
148 let table_name = sanitize_bucket_name(bucket);
149 connection.execute_batch(&format!("DELETE FROM [{table_name}]"))?;
150 }
151 Ok(())
152 }
153}
154
155use rusqlite::OptionalExtension;
156
157#[cfg(test)]
158mod tests {
159 use crate::client::client_storage::client_storage;
160 use crate::client::client_storage::sqlite_client_storage::SqliteClientStorage;
161 use crate::tools::tools::get_temp_dir;
162
163 #[tokio::test]
164 async fn add_test() {
165 let (_temp_dir, temp_dir_path) = get_temp_dir().unwrap();
166 let storage = SqliteClientStorage::new(temp_dir_path.into()).await.unwrap();
167 client_storage::tests::add_test(storage).await;
168 }
169
170 #[tokio::test]
171 async fn trim_test() {
172 let (_temp_dir, temp_dir_path) = get_temp_dir().unwrap();
173 let storage = SqliteClientStorage::new(temp_dir_path.into()).await.unwrap();
174 client_storage::tests::trim_test(storage).await;
175 }
176}