Skip to main content

hashiverse_client_wasm/
wasm_client_storage.rs

1use anyhow::anyhow;
2use hashiverse_lib::client::client_storage::client_storage::{ClientStorage, BUCKETS};
3use hashiverse_lib::tools::time::TimeMillis;
4use indexed_db_futures::database::Database;
5use indexed_db_futures::error::Error;
6use indexed_db_futures::prelude::*;
7use indexed_db_futures::transaction::TransactionMode;
8use indexed_db_futures::KeyPath;
9use log::{info, warn};
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12
13const DATABASE_NAME: &str = "hashiverse.client_storage";
14
15#[derive(Serialize, Deserialize)]
16struct Record {
17    key: String,
18    value: Vec<u8>,
19}
20
21#[derive(Serialize, Deserialize)]
22struct RecordMetadata {
23    key: String,
24    last_accessed: i64,
25    length: u32,
26}
27
28pub struct WasmClientStorage {}
29
30fn build_table(db: &Database, name: &str) -> Result<(), Error> {
31    info!("Building table {}", name);
32
33    // Object
34    {
35        let _object_store = db.create_object_store(name).with_key_path(KeyPath::from("key")).build()?;
36    }
37
38    // Metadata
39    {
40        let name_metadata = format!("{}.metadata", name);
41
42        let object_store = db.create_object_store(name_metadata).with_key_path(KeyPath::from("key")).build()?;
43
44        object_store.create_index("last_accessed", KeyPath::from("last_accessed")).with_unique(false).with_multi_entry(false).build()?;
45    }
46
47    Ok(())
48}
49
50/// Opens (and if necessary creates/upgrades) the client storage IndexedDB.
51///
52/// Every call site goes through this so that the version and upgrade handler
53/// are always applied — even if the database did not exist yet.
54async fn open_database() -> anyhow::Result<Database> {
55    let database = Database::open(DATABASE_NAME)
56        .with_version(1u8)
57        .with_on_blocked(|event| {
58            warn!("indexed_db upgrade blocked: {:?}", event);
59            Ok(())
60        })
61        .with_on_upgrade_needed(|event, db| {
62            let old_version = event.old_version() as u64;
63            let new_version = event.new_version().map(|v| v as u64);
64            warn!("indexed_db upgrade needed from {:?} to {:?}", old_version, new_version);
65
66            match (old_version, new_version) {
67                (0, Some(1)) => {
68                    for bucket in BUCKETS {
69                        build_table(&db, bucket)?;
70                    }
71                }
72                _ => {
73                    warn!("Unhandled upgrade from indexed_db old={:?} to new={:?}", old_version, new_version);
74                }
75            }
76
77            Ok(())
78        })
79        .build()
80        .map_err(|e| anyhow!("{}", e))?
81        .await
82        .map_err(|e| anyhow!("{}", e))?;
83
84    Ok(database)
85}
86
87impl WasmClientStorage {
88    async fn get_database(&self) -> anyhow::Result<Database> {
89        open_database().await
90    }
91
92    pub async fn new() -> anyhow::Result<Arc<Self>> {
93        let _database = open_database().await?;
94        Ok(Arc::new(Self {}))
95    }
96}
97
98#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
99#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
100impl ClientStorage for WasmClientStorage {
101    async fn count(&self, bucket: &str) -> anyhow::Result<usize> {
102        let database = self.get_database().await?;
103        let result = try {
104            let transaction = database.transaction(bucket).with_mode(TransactionMode::Readwrite).build()?;
105            let object_store = transaction.object_store(bucket)?;
106            object_store.count().await? as usize
107        };
108
109        match result {
110            Ok(x) => Ok(x),
111            Err(e) => Err(anyhow::anyhow!("{}", e)),
112        }
113    }
114
115    async fn keys(&self, bucket: &str) -> anyhow::Result<Vec<String>> {
116        let database = self.get_database().await?;
117        let result = try {
118            let transaction = database.transaction(bucket).with_mode(TransactionMode::Readonly).build()?;
119            let object_store = transaction.object_store(bucket)?;
120            let all_records: Vec<Record> = object_store.get_all().serde()?.await?.collect::<Result<Vec<_>, _>>()?;
121            all_records.into_iter().map(|r| r.key).collect()
122        };
123        match result {
124            Ok(x) => Ok(x),
125            Err(e) => Err(anyhow::anyhow!("{}", e)),
126        }
127    }
128
129    async fn get(&self, bucket: &str, key: &str, time_millis: TimeMillis) -> anyhow::Result<Option<Vec<u8>>> {
130        let database = self.get_database().await?;
131        let result = try {
132            let bucket_metadata = format!("{}.metadata", bucket);
133            let transaction = database.transaction([bucket, bucket_metadata.as_str()]).with_mode(TransactionMode::Readwrite).build()?;
134
135            let value = {
136                let object_store = transaction.object_store(bucket)?;
137                let record: Option<Record> = object_store.get(key).serde()?.await?;
138
139                match &record {
140                    Some(record) => Some(record.value.clone()),
141                    None => None,
142                }
143            };
144
145            // Update the metadata only if we have been told to do so
146            if time_millis > TimeMillis::zero() {
147                if let Some(value) = &value {
148                    let object_store_metadata = transaction.object_store(bucket_metadata.as_ref())?;
149                    object_store_metadata
150                        .put(RecordMetadata {
151                            key: key.to_string(),
152                            last_accessed: time_millis.0,
153                            length: value.len() as u32,
154                        })
155                        .serde()?
156                        .await?;
157                }
158            }
159
160            transaction.commit().await?;
161
162            value
163        };
164
165        match result {
166            Ok(x) => Ok(x),
167            Err(e) => Err(anyhow::anyhow!("{}", e)),
168        }
169    }
170
171    async fn put(&self, bucket: &str, key: &str, value: Vec<u8>, time_millis: TimeMillis) -> anyhow::Result<()> {
172        let database = self.get_database().await?;
173        let result = try {
174            let bucket_metadata = format!("{}.metadata", bucket);
175            let transaction = database.transaction([bucket, bucket_metadata.as_ref()]).with_mode(TransactionMode::Readwrite).build()?;
176
177            // Metadata
178            {
179                let object_store_metadata = transaction.object_store(bucket_metadata.as_ref())?;
180                object_store_metadata
181                    .put(RecordMetadata {
182                        key: key.to_string(),
183                        last_accessed: time_millis.0,
184                        length: value.len() as u32,
185                    })
186                    .serde()?
187                    .await?;
188            }
189
190            // Object
191            {
192                let object_store = transaction.object_store(bucket)?;
193                object_store.put(Record { key: key.to_string(), value }).serde()?.await?;
194            }
195
196            transaction.commit().await?;
197        };
198
199        match result {
200            Ok(x) => Ok(x),
201            Err(e) => Err(anyhow::anyhow!("{}", e)),
202        }
203    }
204
205    async fn remove(&self, bucket: &str, key: &str) -> anyhow::Result<()> {
206        let database = self.get_database().await?;
207        let result = try {
208            let bucket_metadata = format!("{}.metadata", bucket);
209            let transaction = database.transaction([bucket, bucket_metadata.as_ref()]).with_mode(TransactionMode::Readwrite).build()?;
210
211            // Metadata
212            {
213                let object_store_metadata = transaction.object_store(bucket_metadata.as_ref())?;
214                object_store_metadata.delete(key.to_string()).await?;
215            }
216
217            // Object
218            {
219                let object_store = transaction.object_store(bucket)?;
220                object_store.delete(key.to_string()).await?;
221            }
222
223            transaction.commit().await?;
224        };
225
226        match result {
227            Ok(x) => Ok(x),
228            Err(e) => Err(anyhow::anyhow!("{}", e)),
229        }
230    }
231
232    async fn trim(&self, bucket: &str, max_count: usize) -> anyhow::Result<()> {
233        let database = self.get_database().await?;
234        let result = try {
235            let bucket_metadata = format!("{}.metadata", bucket);
236            let transaction = database.transaction([bucket, bucket_metadata.as_str()]).with_mode(TransactionMode::Readwrite).build()?;
237
238            let total = transaction.object_store(bucket_metadata.as_ref())?.count().await? as usize;
239            if total > max_count {
240                let num_to_delete = total - max_count;
241                info!("Trimming {} records from {}", num_to_delete, bucket);
242
243                // Cursor on the last_accessed index iterates oldest-first
244                if let Some(mut cursor) = transaction.object_store(bucket_metadata.as_ref())?.index("last_accessed")?.open_cursor().await? {
245                    for _ in 0..num_to_delete {
246                        let key: Option<String> = cursor.primary_key()?;
247                        if let Some(key) = key {
248                            cursor.delete()?.await?;
249                            transaction.object_store(bucket)?.delete(key).await?;
250                            cursor.advance_by(1).await?;
251                        }
252                    }
253                }
254            }
255
256            transaction.commit().await?;
257        };
258
259        match result {
260            Ok(_) => Ok(()),
261            Err(e) => Err(anyhow::anyhow!("{}", e)),
262        }
263    }
264
265    async fn reset(&self) -> anyhow::Result<()> {
266        info!("Resetting client storage");
267        let database = self.get_database().await?;
268        let result = try {
269            for bucket in BUCKETS {
270                let bucket_metadata = format!("{}.metadata", bucket);
271
272                let transaction = database.transaction([bucket, bucket_metadata.as_str()]).with_mode(TransactionMode::Readwrite).build()?;
273                {
274                    transaction.object_store(bucket)?.clear()?;
275                    transaction.object_store(bucket_metadata.as_ref())?.clear()?;
276                }
277                transaction.commit().await?;
278            }
279        };
280
281        match result {
282            Ok(_) => Ok(()),
283            Err(e) => Err(anyhow::anyhow!("{}", e)),
284        }
285    }
286}
287
288#[cfg(test)]
289pub mod tests {
290    extern crate wasm_bindgen_test;
291    use hashiverse_lib::client::client_storage::client_storage;
292    use wasm_bindgen_test::*;
293
294    wasm_bindgen_test_configure!(run_in_browser);
295
296    #[wasm_bindgen_test]
297    async fn add_test() {
298        use crate::wasm_client_storage::WasmClientStorage;
299        client_storage::tests::add_test(WasmClientStorage::new().await.unwrap()).await;
300    }
301
302    #[wasm_bindgen_test]
303    async fn trim_test() {
304        use crate::wasm_client_storage::WasmClientStorage;
305        client_storage::tests::trim_test(WasmClientStorage::new().await.unwrap()).await;
306    }
307}