Skip to main content

hashiverse_lib/tools/
parallel_pow_generator.rs

1//! # Parallel proof-of-work search engine
2//!
3//! Proof-of-work is mandatory on every outgoing RPC, every peer announcement, and every
4//! piece of report/feedback — so finding a PoW solution quickly is on the hot path for
5//! virtually every client and server action. This module isolates that work behind a
6//! single trait, [`ParallelPowGenerator`], so the calling code doesn't care whether it's
7//! running on a 32-core server or a single-threaded WASM Web Worker.
8//!
9//! ## Implementations
10//!
11//! - [`NativeParallelPowGenerator`] — rayon + `tokio::task::spawn_blocking`, saturates
12//!   every CPU core on native targets.
13//! - [`StubParallelPowGenerator`] — single-threaded fallback that works on every target
14//!   including WASM. Browser clients run this (with a relaxed `pow_min`) because Web
15//!   Workers don't expose thread pools.
16//!
17//! ## Observability
18//!
19//! [`JobTracker`] + [`PowJobStatus`] expose the set of in-flight PoW jobs and the
20//! best-so-far pow for each. The web client surfaces this in the UI so that when a post
21//! feels slow to send the user can see it's because PoW is still grinding.
22//!
23//! ## Shared loop
24//!
25//! [`generate_loop`] is the one-true batching loop used by both implementations: repeatedly
26//! call [`ParallelPowGenerator::generate_best_effort`] in 64K-attempt batches, update the
27//! tracker, and bail as soon as a batch returns `pow >= pow_min`. Every batch also yields
28//! to the runtime so on single-threaded targets other tasks still get a chance to run.
29
30use crate::tools::pow::{pow_generate_with_iteration_limit};
31use crate::tools::pow_required_estimator::PowRequiredEstimator;
32use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
33use crate::tools::tools;
34use crate::tools::types::{Hash, Pow, Salt};
35use log::trace;
36use std::collections::HashMap;
37use std::sync::{Arc, Mutex};
38
39pub struct PowJobStatus {
40    pub label: String,
41    pub pow_min: Pow,
42    pub best_pow_so_far: Pow,
43}
44
45type JobId = u64;
46
47struct JobEntry {
48    label: String,
49    pow_min: Pow,
50    best_pow_so_far: Pow,
51}
52
53#[derive(Default)]
54pub struct JobTracker {
55    next_id: JobId,
56    jobs: HashMap<JobId, JobEntry>,
57}
58
59impl JobTracker {
60    pub fn add(&mut self, label: &str, pow_min: Pow) -> JobId {
61        let job_id = self.next_id;
62        self.next_id += 1;
63        self.jobs.insert(job_id, JobEntry { label: label.to_string(), pow_min, best_pow_so_far: Pow(0) });
64        job_id
65    }
66
67    pub fn update(&mut self, job_id: JobId, best_pow_so_far: Pow) {
68        if let Some(entry) = self.jobs.get_mut(&job_id) {
69            entry.best_pow_so_far = best_pow_so_far;
70        }
71    }
72
73    pub fn remove(&mut self, job_id: JobId) {
74        self.jobs.remove(&job_id);
75    }
76
77    pub fn snapshot(&self) -> Vec<PowJobStatus> {
78        self.jobs.values().map(|entry| PowJobStatus {
79            label: entry.label.clone(),
80            pow_min: entry.pow_min,
81            best_pow_so_far: entry.best_pow_so_far,
82        }).collect()
83    }
84}
85
86/// A pluggable engine for searching for proof-of-work solutions in parallel.
87///
88/// Proof-of-work is required on every RPC packet, on peer announcements, and on report /
89/// feedback submissions, so finding PoW is on the hot path for every outbound action a client
90/// or server takes. `ParallelPowGenerator` abstracts over the concrete way we parallelize that
91/// search so the calling code stays platform-agnostic:
92///
93/// - [`NativeParallelPowGenerator`] uses `rayon` + `tokio::task::spawn_blocking` to pin the
94///   search across all CPU cores on native targets.
95/// - [`StubParallelPowGenerator`] is a single-threaded fallback that works on every target,
96///   including WASM. Browser clients use this (with a relaxed `pow_min`) because Web Workers
97///   do not expose `rayon` / threads directly.
98///
99/// Implementations must also maintain the `active_jobs()` observability view — the UI surfaces
100/// in-progress PoW searches to end users so they understand why an action is slow.
101#[async_trait::async_trait]
102pub trait ParallelPowGenerator: Send + Sync {
103    /// Run up to `iteration_limit` hash attempts and return the best `(Salt, Pow, Hash)` found.
104    /// Exits early if `pow >= pow_min` is achieved.
105    ///
106    /// `label` is a human-readable job name for observability (e.g. `"rpc"`, `"feedback"`).
107    /// `data_hash` must be pre-computed via `pow_compute_data_hash` before calling.
108    async fn generate_best_effort(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
109
110    /// Loop `generate_best_effort` in batches until `pow >= pow_min` is achieved.
111    async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
112
113    /// Snapshot of all concurrently in-flight `generate` calls.
114    fn active_jobs(&self) -> Vec<PowJobStatus>;
115}
116
117/// Shared loop logic for `generate`: repeatedly calls `generate_best_effort` in
118/// `BATCH_SIZE` batches until `pow >= pow_min`, tracking progress via `JobTracker`.
119///
120/// Future optimization: the current batch-and-wait approach dispatches to all N workers,
121/// then waits for all N to respond before dispatching the next batch. This means fast
122/// workers sit idle while the slowest worker finishes. A better design would feed workers
123/// individually as they complete (work-stealing / pool-style), maintaining a shared
124/// "best result so far" per job and checking pow_min after each worker result. This would
125/// also allow concurrent generate() calls to have their batches truly interleaved at the
126/// individual-worker level rather than at the batch level.
127pub async fn generate_loop(
128    generator: &(dyn ParallelPowGenerator + '_),
129    tracker: &Arc<Mutex<JobTracker>>,
130    label: &str,
131    pow_min: Pow,
132    data_hash: Hash,
133) -> anyhow::Result<(Salt, Pow, Hash)> {
134    const BATCH_SIZE: usize = 64 * 1024;
135    let real_time_provider = RealTimeProvider::default();
136    let mut estimator = PowRequiredEstimator::new(real_time_provider.current_time_millis(), label, pow_min);
137    let job_id = tracker.lock().unwrap().add(label, pow_min);
138    loop {
139        let result = generator.generate_best_effort(label, BATCH_SIZE, pow_min, data_hash).await?;
140        if result.1 >= pow_min {
141            tracker.lock().unwrap().remove(job_id);
142            return Ok(result);
143        }
144        tracker.lock().unwrap().update(job_id, result.1);
145        let progress = estimator.record_batch_and_estimate(real_time_provider.current_time_millis(), BATCH_SIZE, result.1);
146        trace!("{}", progress);
147        tools::yield_now().await;
148    }
149}
150
151// ──────────────────────────────────────────────────────────────────────────────
152// StubParallelPowGenerator — single-threaded, works on all platforms
153// ──────────────────────────────────────────────────────────────────────────────
154
155pub struct StubParallelPowGenerator {
156    tracker: Arc<Mutex<JobTracker>>,
157}
158
159impl StubParallelPowGenerator {
160    pub fn new() -> Self {
161        Self { tracker: Arc::new(Mutex::new(JobTracker::default())) }
162    }
163}
164
165impl Default for StubParallelPowGenerator {
166    fn default() -> Self { Self::new() }
167}
168
169#[async_trait::async_trait]
170impl ParallelPowGenerator for StubParallelPowGenerator {
171    async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
172        pow_generate_with_iteration_limit(iteration_limit, pow_min, &data_hash).await
173    }
174
175    async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
176        generate_loop(self, &self.tracker, label, pow_min, data_hash).await
177    }
178
179    fn active_jobs(&self) -> Vec<PowJobStatus> {
180        self.tracker.lock().unwrap().snapshot()
181    }
182}
183
184// ──────────────────────────────────────────────────────────────────────────────
185// NativeParallelPowGenerator — rayon + spawn_blocking, non-WASM only
186// ──────────────────────────────────────────────────────────────────────────────
187
188#[cfg(not(target_arch = "wasm32"))]
189pub struct NativeParallelPowGenerator {
190    tracker: Arc<Mutex<JobTracker>>,
191}
192
193#[cfg(not(target_arch = "wasm32"))]
194impl NativeParallelPowGenerator {
195    pub fn new() -> Self {
196        Self { tracker: Arc::new(Mutex::new(JobTracker::default())) }
197    }
198}
199
200#[cfg(not(target_arch = "wasm32"))]
201impl Default for NativeParallelPowGenerator {
202    fn default() -> Self { Self::new() }
203}
204
205#[cfg(not(target_arch = "wasm32"))]
206#[async_trait::async_trait]
207impl ParallelPowGenerator for NativeParallelPowGenerator {
208    async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
209        let num_threads = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
210        let per_thread = (iteration_limit / num_threads).max(1);
211        let result = tokio::task::spawn_blocking(move || {
212            use rayon::prelude::*;
213            (0..num_threads)
214                .into_par_iter()
215                .map(|_| {
216                    let mut best = (Salt::zero(), Pow(0), Hash::zero());
217                    for _ in 0..per_thread {
218                        let salt = Salt::random();
219                        if let Ok((pow, hash)) = crate::tools::pow::pow_measure_from_data_hash(&data_hash, &salt) {
220                            if pow > best.1 {
221                                best = (salt, pow, hash);
222                                if pow >= pow_min {
223                                    break;
224                                }
225                            }
226                        }
227                    }
228                    best
229                })
230                .reduce(
231                    || (Salt::zero(), Pow(0), Hash::zero()),
232                    |a, b| if b.1 > a.1 { b } else { a },
233                )
234        })
235        .await?;
236        Ok(result)
237    }
238
239    async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
240        generate_loop(self, &self.tracker, label, pow_min, data_hash).await
241    }
242
243    fn active_jobs(&self) -> Vec<PowJobStatus> {
244        self.tracker.lock().unwrap().snapshot()
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use crate::tools::parallel_pow_generator::{ParallelPowGenerator, StubParallelPowGenerator};
251    use crate::tools::pow::pow_compute_data_hash;
252    use crate::tools::tools;
253    use crate::tools::types::Pow;
254
255    #[tokio::test]
256    async fn stub_generates_valid_pow() -> anyhow::Result<()> {
257        const POW_MIN: Pow = Pow(12);
258        let mut data = [0u8; 64];
259        tools::random_fill_bytes(&mut data);
260        let data_hash = pow_compute_data_hash(&[&data]);
261        let generator = StubParallelPowGenerator::new();
262        let (_, pow, _) = generator.generate("test", POW_MIN, data_hash).await?;
263        assert!(pow >= POW_MIN);
264        Ok(())
265    }
266
267    #[cfg(not(target_arch = "wasm32"))]
268    #[tokio::test]
269    async fn native_generates_valid_pow() -> anyhow::Result<()> {
270        const POW_MIN: Pow = Pow(12);
271        let mut data = [0u8; 64];
272        tools::random_fill_bytes(&mut data);
273        let data_hash = pow_compute_data_hash(&[&data]);
274        let generator = crate::tools::parallel_pow_generator::NativeParallelPowGenerator::new();
275        let (_, pow, _) = generator.generate("test", POW_MIN, data_hash).await?;
276        assert!(pow >= POW_MIN);
277        Ok(())
278    }
279}