hashiverse_client_wasm/
wasm_client_storage.rs1use anyhow::anyhow;
2use hashiverse_lib::client::client_storage::client_storage::{ClientStorage, BUCKETS};
3use hashiverse_lib::tools::time::TimeMillis;
4use indexed_db_futures::database::Database;
5use indexed_db_futures::error::Error;
6use indexed_db_futures::prelude::*;
7use indexed_db_futures::transaction::TransactionMode;
8use indexed_db_futures::KeyPath;
9use log::{info, warn};
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12
13const DATABASE_NAME: &str = "hashiverse.client_storage";
14
15#[derive(Serialize, Deserialize)]
16struct Record {
17 key: String,
18 value: Vec<u8>,
19}
20
21#[derive(Serialize, Deserialize)]
22struct RecordMetadata {
23 key: String,
24 last_accessed: i64,
25 length: u32,
26}
27
28pub struct WasmClientStorage {}
29
30fn build_table(db: &Database, name: &str) -> Result<(), Error> {
31 info!("Building table {}", name);
32
33 {
35 let _object_store = db.create_object_store(name).with_key_path(KeyPath::from("key")).build()?;
36 }
37
38 {
40 let name_metadata = format!("{}.metadata", name);
41
42 let object_store = db.create_object_store(name_metadata).with_key_path(KeyPath::from("key")).build()?;
43
44 object_store.create_index("last_accessed", KeyPath::from("last_accessed")).with_unique(false).with_multi_entry(false).build()?;
45 }
46
47 Ok(())
48}
49
50async fn open_database() -> anyhow::Result<Database> {
55 let database = Database::open(DATABASE_NAME)
56 .with_version(1u8)
57 .with_on_blocked(|event| {
58 warn!("indexed_db upgrade blocked: {:?}", event);
59 Ok(())
60 })
61 .with_on_upgrade_needed(|event, db| {
62 let old_version = event.old_version() as u64;
63 let new_version = event.new_version().map(|v| v as u64);
64 warn!("indexed_db upgrade needed from {:?} to {:?}", old_version, new_version);
65
66 match (old_version, new_version) {
67 (0, Some(1)) => {
68 for bucket in BUCKETS {
69 build_table(&db, bucket)?;
70 }
71 }
72 _ => {
73 warn!("Unhandled upgrade from indexed_db old={:?} to new={:?}", old_version, new_version);
74 }
75 }
76
77 Ok(())
78 })
79 .build()
80 .map_err(|e| anyhow!("{}", e))?
81 .await
82 .map_err(|e| anyhow!("{}", e))?;
83
84 Ok(database)
85}
86
87impl WasmClientStorage {
88 async fn get_database(&self) -> anyhow::Result<Database> {
89 open_database().await
90 }
91
92 pub async fn new() -> anyhow::Result<Arc<Self>> {
93 let _database = open_database().await?;
94 Ok(Arc::new(Self {}))
95 }
96}
97
98#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
99#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
100impl ClientStorage for WasmClientStorage {
101 async fn count(&self, bucket: &str) -> anyhow::Result<usize> {
102 let database = self.get_database().await?;
103 let result = try {
104 let transaction = database.transaction(bucket).with_mode(TransactionMode::Readwrite).build()?;
105 let object_store = transaction.object_store(bucket)?;
106 object_store.count().await? as usize
107 };
108
109 match result {
110 Ok(x) => Ok(x),
111 Err(e) => Err(anyhow::anyhow!("{}", e)),
112 }
113 }
114
115 async fn keys(&self, bucket: &str) -> anyhow::Result<Vec<String>> {
116 let database = self.get_database().await?;
117 let result = try {
118 let transaction = database.transaction(bucket).with_mode(TransactionMode::Readonly).build()?;
119 let object_store = transaction.object_store(bucket)?;
120 let all_records: Vec<Record> = object_store.get_all().serde()?.await?.collect::<Result<Vec<_>, _>>()?;
121 all_records.into_iter().map(|r| r.key).collect()
122 };
123 match result {
124 Ok(x) => Ok(x),
125 Err(e) => Err(anyhow::anyhow!("{}", e)),
126 }
127 }
128
129 async fn get(&self, bucket: &str, key: &str, time_millis: TimeMillis) -> anyhow::Result<Option<Vec<u8>>> {
130 let database = self.get_database().await?;
131 let result = try {
132 let bucket_metadata = format!("{}.metadata", bucket);
133 let transaction = database.transaction([bucket, bucket_metadata.as_str()]).with_mode(TransactionMode::Readwrite).build()?;
134
135 let value = {
136 let object_store = transaction.object_store(bucket)?;
137 let record: Option<Record> = object_store.get(key).serde()?.await?;
138
139 match &record {
140 Some(record) => Some(record.value.clone()),
141 None => None,
142 }
143 };
144
145 if time_millis > TimeMillis::zero() {
147 if let Some(value) = &value {
148 let object_store_metadata = transaction.object_store(bucket_metadata.as_ref())?;
149 object_store_metadata
150 .put(RecordMetadata {
151 key: key.to_string(),
152 last_accessed: time_millis.0,
153 length: value.len() as u32,
154 })
155 .serde()?
156 .await?;
157 }
158 }
159
160 transaction.commit().await?;
161
162 value
163 };
164
165 match result {
166 Ok(x) => Ok(x),
167 Err(e) => Err(anyhow::anyhow!("{}", e)),
168 }
169 }
170
171 async fn put(&self, bucket: &str, key: &str, value: Vec<u8>, time_millis: TimeMillis) -> anyhow::Result<()> {
172 let database = self.get_database().await?;
173 let result = try {
174 let bucket_metadata = format!("{}.metadata", bucket);
175 let transaction = database.transaction([bucket, bucket_metadata.as_ref()]).with_mode(TransactionMode::Readwrite).build()?;
176
177 {
179 let object_store_metadata = transaction.object_store(bucket_metadata.as_ref())?;
180 object_store_metadata
181 .put(RecordMetadata {
182 key: key.to_string(),
183 last_accessed: time_millis.0,
184 length: value.len() as u32,
185 })
186 .serde()?
187 .await?;
188 }
189
190 {
192 let object_store = transaction.object_store(bucket)?;
193 object_store.put(Record { key: key.to_string(), value }).serde()?.await?;
194 }
195
196 transaction.commit().await?;
197 };
198
199 match result {
200 Ok(x) => Ok(x),
201 Err(e) => Err(anyhow::anyhow!("{}", e)),
202 }
203 }
204
205 async fn remove(&self, bucket: &str, key: &str) -> anyhow::Result<()> {
206 let database = self.get_database().await?;
207 let result = try {
208 let bucket_metadata = format!("{}.metadata", bucket);
209 let transaction = database.transaction([bucket, bucket_metadata.as_ref()]).with_mode(TransactionMode::Readwrite).build()?;
210
211 {
213 let object_store_metadata = transaction.object_store(bucket_metadata.as_ref())?;
214 object_store_metadata.delete(key.to_string()).await?;
215 }
216
217 {
219 let object_store = transaction.object_store(bucket)?;
220 object_store.delete(key.to_string()).await?;
221 }
222
223 transaction.commit().await?;
224 };
225
226 match result {
227 Ok(x) => Ok(x),
228 Err(e) => Err(anyhow::anyhow!("{}", e)),
229 }
230 }
231
232 async fn trim(&self, bucket: &str, max_count: usize) -> anyhow::Result<()> {
233 let database = self.get_database().await?;
234 let result = try {
235 let bucket_metadata = format!("{}.metadata", bucket);
236 let transaction = database.transaction([bucket, bucket_metadata.as_str()]).with_mode(TransactionMode::Readwrite).build()?;
237
238 let total = transaction.object_store(bucket_metadata.as_ref())?.count().await? as usize;
239 if total > max_count {
240 let num_to_delete = total - max_count;
241 info!("Trimming {} records from {}", num_to_delete, bucket);
242
243 if let Some(mut cursor) = transaction.object_store(bucket_metadata.as_ref())?.index("last_accessed")?.open_cursor().await? {
245 for _ in 0..num_to_delete {
246 let key: Option<String> = cursor.primary_key()?;
247 if let Some(key) = key {
248 cursor.delete()?.await?;
249 transaction.object_store(bucket)?.delete(key).await?;
250 cursor.advance_by(1).await?;
251 }
252 }
253 }
254 }
255
256 transaction.commit().await?;
257 };
258
259 match result {
260 Ok(_) => Ok(()),
261 Err(e) => Err(anyhow::anyhow!("{}", e)),
262 }
263 }
264
265 async fn reset(&self) -> anyhow::Result<()> {
266 info!("Resetting client storage");
267 let database = self.get_database().await?;
268 let result = try {
269 for bucket in BUCKETS {
270 let bucket_metadata = format!("{}.metadata", bucket);
271
272 let transaction = database.transaction([bucket, bucket_metadata.as_str()]).with_mode(TransactionMode::Readwrite).build()?;
273 {
274 transaction.object_store(bucket)?.clear()?;
275 transaction.object_store(bucket_metadata.as_ref())?.clear()?;
276 }
277 transaction.commit().await?;
278 }
279 };
280
281 match result {
282 Ok(_) => Ok(()),
283 Err(e) => Err(anyhow::anyhow!("{}", e)),
284 }
285 }
286}
287
288#[cfg(test)]
289pub mod tests {
290 extern crate wasm_bindgen_test;
291 use hashiverse_lib::client::client_storage::client_storage;
292 use wasm_bindgen_test::*;
293
294 wasm_bindgen_test_configure!(run_in_browser);
295
296 #[wasm_bindgen_test]
297 async fn add_test() {
298 use crate::wasm_client_storage::WasmClientStorage;
299 client_storage::tests::add_test(WasmClientStorage::new().await.unwrap()).await;
300 }
301
302 #[wasm_bindgen_test]
303 async fn trim_test() {
304 use crate::wasm_client_storage::WasmClientStorage;
305 client_storage::tests::trim_test(WasmClientStorage::new().await.unwrap()).await;
306 }
307}