hashiverse_client_wasm/
wasm_parallel_pow_generator.rs1use hashiverse_lib::tools::parallel_pow_generator::{generate_loop, JobTracker, ParallelPowGenerator, PowJobStatus};
2use hashiverse_lib::tools::types::{Hash, Pow, Salt};
3use js_sys::{Array, Object, Reflect};
4use log::{info, warn};
5use send_wrapper::SendWrapper;
6use std::sync::{Arc, Mutex};
7use wasm_bindgen::prelude::*;
8use wasm_bindgen_futures::JsFuture;
9use web_sys::{MessageChannel, MessageEvent, Worker};
10
11pub struct WasmParallelPowGenerator {
15 tracker: Arc<Mutex<JobTracker>>,
16 workers: Vec<Worker>,
17}
18
19unsafe impl Send for WasmParallelPowGenerator {}
22unsafe impl Sync for WasmParallelPowGenerator {}
23
24impl WasmParallelPowGenerator {
25 pub fn from_workers(workers: Vec<Worker>) -> Self {
27 info!("WasmParallelPowGenerator: received {} pow workers", workers.len());
28 Self {
29 tracker: Arc::new(Mutex::new(JobTracker::default())),
30 workers,
31 }
32 }
33}
34
35#[async_trait::async_trait]
36impl ParallelPowGenerator for WasmParallelPowGenerator {
37 async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
38 if self.workers.is_empty() {
39 anyhow::bail!("No pow workers available");
40 }
41
42 let num_workers = self.workers.len();
43 let per_worker = (iteration_limit / num_workers).max(1);
44 let data_hash_hex = hex::encode(data_hash);
45
46 let inner = async {
49 let mut response_futures = Vec::with_capacity(num_workers);
53 for worker in &self.workers {
54 let channel = MessageChannel::new()
55 .map_err(|e| anyhow::anyhow!("Failed to create MessageChannel: {:?}", e))?;
56
57 let port1 = channel.port1();
58 let port2 = channel.port2();
59
60 let promise = js_sys::Promise::new(&mut |resolve, _reject| {
62 let resolve_clone = resolve.clone();
63 let onmessage = Closure::once_into_js(move |event: MessageEvent| {
64 resolve_clone.call1(&JsValue::NULL, &event.data()).ok();
65 });
66 port1.set_onmessage(Some(onmessage.unchecked_ref()));
67 });
68
69 let msg = Object::new();
71 Reflect::set(&msg, &JsValue::from_str("iteration_limit"), &JsValue::from_f64(per_worker as f64)).ok();
72 Reflect::set(&msg, &JsValue::from_str("pow_min"), &JsValue::from_f64(pow_min.0 as f64)).ok();
73 Reflect::set(&msg, &JsValue::from_str("data_hash_hex"), &JsValue::from_str(&data_hash_hex)).ok();
74
75 let transfer = Array::new();
77 transfer.push(&port2);
78 worker.post_message_with_transfer(&msg, &transfer)
79 .map_err(|e| anyhow::anyhow!("Failed to post message to pow worker: {:?}", e))?;
80
81 response_futures.push(JsFuture::from(promise));
82 }
83
84 let mut best = (Salt::zero(), Pow(0), Hash::zero());
86 for future in response_futures {
87 let response_data = future.await
88 .map_err(|e| anyhow::anyhow!("Pow worker response error: {:?}", e))?;
89
90 if let Some(result_str) = Reflect::get(&response_data, &JsValue::from_str("result"))
91 .ok()
92 .and_then(|v| v.as_string())
93 {
94 if let Some(parsed) = parse_batch_result(&result_str) {
95 if parsed.1 > best.1 {
96 best = parsed;
97 }
98 }
99 }
100 }
101
102 Ok::<_, anyhow::Error>(best)
103 };
104
105 SendWrapper::new(inner).await
106 }
107
108 async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
109 generate_loop(self, &self.tracker, label, pow_min, data_hash).await
110 }
111
112 fn active_jobs(&self) -> Vec<PowJobStatus> {
113 self.tracker.lock().unwrap().snapshot()
114 }
115}
116
117fn parse_batch_result(result: &str) -> Option<(Salt, Pow, Hash)> {
119 let parts: Vec<&str> = result.splitn(3, ':').collect();
120 if parts.len() != 3 {
121 warn!("Invalid pow_compute_batch result format: {}", result);
122 return None;
123 }
124
125 let salt_bytes = hex::decode(parts[0]).ok()?;
126 let pow_val: u8 = parts[1].parse().ok()?;
127 let hash_bytes = hex::decode(parts[2]).ok()?;
128
129 let salt = Salt::from_slice(&salt_bytes).ok()?;
130 let hash = Hash::from_slice(&hash_bytes).ok()?;
131
132 Some((salt, Pow(pow_val), hash))
133}