Skip to main content

hashiverse_client_wasm/
wasm_parallel_pow_generator.rs

1use hashiverse_lib::tools::parallel_pow_generator::{generate_loop, JobTracker, ParallelPowGenerator, PowJobStatus};
2use hashiverse_lib::tools::types::{Hash, Pow, Salt};
3use js_sys::{Array, Object, Reflect};
4use log::{info, warn};
5use send_wrapper::SendWrapper;
6use std::sync::{Arc, Mutex};
7use wasm_bindgen::prelude::*;
8use wasm_bindgen_futures::JsFuture;
9use web_sys::{MessageChannel, MessageEvent, Worker};
10
11/// A `ParallelPowGenerator` that distributes PoW work across pre-created Web Workers.
12/// The TypeScript side is responsible for spawning and initializing the workers;
13/// this struct simply receives the ready `Worker` handles.
14pub struct WasmParallelPowGenerator {
15    tracker: Arc<Mutex<JobTracker>>,
16    workers: Vec<Worker>,
17}
18
19// Safety: In WASM, everything is single-threaded. Worker handles are not Send
20// in web-sys's type system, but we never actually move them across threads.
21unsafe impl Send for WasmParallelPowGenerator {}
22unsafe impl Sync for WasmParallelPowGenerator {}
23
24impl WasmParallelPowGenerator {
25    /// Create a new generator from pre-initialized Worker handles.
26    pub fn from_workers(workers: Vec<Worker>) -> Self {
27        info!("WasmParallelPowGenerator: received {} pow workers", workers.len());
28        Self {
29            tracker: Arc::new(Mutex::new(JobTracker::default())),
30            workers,
31        }
32    }
33}
34
35#[async_trait::async_trait]
36impl ParallelPowGenerator for WasmParallelPowGenerator {
37    async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
38        if self.workers.is_empty() {
39            anyhow::bail!("No pow workers available");
40        }
41
42        let num_workers = self.workers.len();
43        let per_worker = (iteration_limit / num_workers).max(1);
44        let data_hash_hex = hex::encode(data_hash);
45
46        // We need SendWrapper because JsFuture and Worker are !Send, but the
47        // trait requires Send futures. In WASM everything is single-threaded.
48        let inner = async {
49            // Dispatch work to all workers using a MessageChannel per worker.
50            // Each call gets its own isolated ports, so concurrent generate_best_effort
51            // calls don't clobber each other's onmessage handlers.
52            let mut response_futures = Vec::with_capacity(num_workers);
53            for worker in &self.workers {
54                let channel = MessageChannel::new()
55                    .map_err(|e| anyhow::anyhow!("Failed to create MessageChannel: {:?}", e))?;
56
57                let port1 = channel.port1();
58                let port2 = channel.port2();
59
60                // Listen for the response on port1
61                let promise = js_sys::Promise::new(&mut |resolve, _reject| {
62                    let resolve_clone = resolve.clone();
63                    let onmessage = Closure::once_into_js(move |event: MessageEvent| {
64                        resolve_clone.call1(&JsValue::NULL, &event.data()).ok();
65                    });
66                    port1.set_onmessage(Some(onmessage.unchecked_ref()));
67                });
68
69                // Build the request message
70                let msg = Object::new();
71                Reflect::set(&msg, &JsValue::from_str("iteration_limit"), &JsValue::from_f64(per_worker as f64)).ok();
72                Reflect::set(&msg, &JsValue::from_str("pow_min"), &JsValue::from_f64(pow_min.0 as f64)).ok();
73                Reflect::set(&msg, &JsValue::from_str("data_hash_hex"), &JsValue::from_str(&data_hash_hex)).ok();
74
75                // Transfer port2 to the worker so it can reply on it
76                let transfer = Array::new();
77                transfer.push(&port2);
78                worker.post_message_with_transfer(&msg, &transfer)
79                    .map_err(|e| anyhow::anyhow!("Failed to post message to pow worker: {:?}", e))?;
80
81                response_futures.push(JsFuture::from(promise));
82            }
83
84            // Await all responses and pick the best result
85            let mut best = (Salt::zero(), Pow(0), Hash::zero());
86            for future in response_futures {
87                let response_data = future.await
88                    .map_err(|e| anyhow::anyhow!("Pow worker response error: {:?}", e))?;
89
90                if let Some(result_str) = Reflect::get(&response_data, &JsValue::from_str("result"))
91                    .ok()
92                    .and_then(|v| v.as_string())
93                {
94                    if let Some(parsed) = parse_batch_result(&result_str) {
95                        if parsed.1 > best.1 {
96                            best = parsed;
97                        }
98                    }
99                }
100            }
101
102            Ok::<_, anyhow::Error>(best)
103        };
104
105        SendWrapper::new(inner).await
106    }
107
108    async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
109        generate_loop(self, &self.tracker, label, pow_min, data_hash).await
110    }
111
112    fn active_jobs(&self) -> Vec<PowJobStatus> {
113        self.tracker.lock().unwrap().snapshot()
114    }
115}
116
117/// Parse the `salt_hex:pow_u8:hash_hex` result string from `pow_compute_batch`.
118fn parse_batch_result(result: &str) -> Option<(Salt, Pow, Hash)> {
119    let parts: Vec<&str> = result.splitn(3, ':').collect();
120    if parts.len() != 3 {
121        warn!("Invalid pow_compute_batch result format: {}", result);
122        return None;
123    }
124
125    let salt_bytes = hex::decode(parts[0]).ok()?;
126    let pow_val: u8 = parts[1].parse().ok()?;
127    let hash_bytes = hex::decode(parts[2]).ok()?;
128
129    let salt = Salt::from_slice(&salt_bytes).ok()?;
130    let hash = Hash::from_slice(&hash_bytes).ok()?;
131
132    Some((salt, Pow(pow_val), hash))
133}