1use crate::tools::pow::{pow_generate_with_iteration_limit};
31use crate::tools::pow_required_estimator::PowRequiredEstimator;
32use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
33use crate::tools::tools;
34use crate::tools::types::{Hash, Pow, Salt};
35use log::trace;
36use std::collections::HashMap;
37use std::sync::{Arc, Mutex};
38
39pub struct PowJobStatus {
40 pub label: String,
41 pub pow_min: Pow,
42 pub best_pow_so_far: Pow,
43}
44
45type JobId = u64;
46
47struct JobEntry {
48 label: String,
49 pow_min: Pow,
50 best_pow_so_far: Pow,
51}
52
53#[derive(Default)]
54pub struct JobTracker {
55 next_id: JobId,
56 jobs: HashMap<JobId, JobEntry>,
57}
58
59impl JobTracker {
60 pub fn add(&mut self, label: &str, pow_min: Pow) -> JobId {
61 let job_id = self.next_id;
62 self.next_id += 1;
63 self.jobs.insert(job_id, JobEntry { label: label.to_string(), pow_min, best_pow_so_far: Pow(0) });
64 job_id
65 }
66
67 pub fn update(&mut self, job_id: JobId, best_pow_so_far: Pow) {
68 if let Some(entry) = self.jobs.get_mut(&job_id) {
69 entry.best_pow_so_far = best_pow_so_far;
70 }
71 }
72
73 pub fn remove(&mut self, job_id: JobId) {
74 self.jobs.remove(&job_id);
75 }
76
77 pub fn snapshot(&self) -> Vec<PowJobStatus> {
78 self.jobs.values().map(|entry| PowJobStatus {
79 label: entry.label.clone(),
80 pow_min: entry.pow_min,
81 best_pow_so_far: entry.best_pow_so_far,
82 }).collect()
83 }
84}
85
86#[async_trait::async_trait]
102pub trait ParallelPowGenerator: Send + Sync {
103 async fn generate_best_effort(&self, label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
109
110 async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)>;
112
113 fn active_jobs(&self) -> Vec<PowJobStatus>;
115}
116
117pub async fn generate_loop(
128 generator: &(dyn ParallelPowGenerator + '_),
129 tracker: &Arc<Mutex<JobTracker>>,
130 label: &str,
131 pow_min: Pow,
132 data_hash: Hash,
133) -> anyhow::Result<(Salt, Pow, Hash)> {
134 const BATCH_SIZE: usize = 64 * 1024;
135 let real_time_provider = RealTimeProvider::default();
136 let mut estimator = PowRequiredEstimator::new(real_time_provider.current_time_millis(), label, pow_min);
137 let job_id = tracker.lock().unwrap().add(label, pow_min);
138 loop {
139 let result = generator.generate_best_effort(label, BATCH_SIZE, pow_min, data_hash).await?;
140 if result.1 >= pow_min {
141 tracker.lock().unwrap().remove(job_id);
142 return Ok(result);
143 }
144 tracker.lock().unwrap().update(job_id, result.1);
145 let progress = estimator.record_batch_and_estimate(real_time_provider.current_time_millis(), BATCH_SIZE, result.1);
146 trace!("{}", progress);
147 tools::yield_now().await;
148 }
149}
150
151pub struct StubParallelPowGenerator {
156 tracker: Arc<Mutex<JobTracker>>,
157}
158
159impl StubParallelPowGenerator {
160 pub fn new() -> Self {
161 Self { tracker: Arc::new(Mutex::new(JobTracker::default())) }
162 }
163}
164
165impl Default for StubParallelPowGenerator {
166 fn default() -> Self { Self::new() }
167}
168
169#[async_trait::async_trait]
170impl ParallelPowGenerator for StubParallelPowGenerator {
171 async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
172 pow_generate_with_iteration_limit(iteration_limit, pow_min, &data_hash).await
173 }
174
175 async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
176 generate_loop(self, &self.tracker, label, pow_min, data_hash).await
177 }
178
179 fn active_jobs(&self) -> Vec<PowJobStatus> {
180 self.tracker.lock().unwrap().snapshot()
181 }
182}
183
184#[cfg(not(target_arch = "wasm32"))]
189pub struct NativeParallelPowGenerator {
190 tracker: Arc<Mutex<JobTracker>>,
191}
192
193#[cfg(not(target_arch = "wasm32"))]
194impl NativeParallelPowGenerator {
195 pub fn new() -> Self {
196 Self { tracker: Arc::new(Mutex::new(JobTracker::default())) }
197 }
198}
199
200#[cfg(not(target_arch = "wasm32"))]
201impl Default for NativeParallelPowGenerator {
202 fn default() -> Self { Self::new() }
203}
204
205#[cfg(not(target_arch = "wasm32"))]
206#[async_trait::async_trait]
207impl ParallelPowGenerator for NativeParallelPowGenerator {
208 async fn generate_best_effort(&self, _label: &str, iteration_limit: usize, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
209 let num_threads = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
210 let per_thread = (iteration_limit / num_threads).max(1);
211 let result = tokio::task::spawn_blocking(move || {
212 use rayon::prelude::*;
213 (0..num_threads)
214 .into_par_iter()
215 .map(|_| {
216 let mut best = (Salt::zero(), Pow(0), Hash::zero());
217 for _ in 0..per_thread {
218 let salt = Salt::random();
219 if let Ok((pow, hash)) = crate::tools::pow::pow_measure_from_data_hash(&data_hash, &salt) {
220 if pow > best.1 {
221 best = (salt, pow, hash);
222 if pow >= pow_min {
223 break;
224 }
225 }
226 }
227 }
228 best
229 })
230 .reduce(
231 || (Salt::zero(), Pow(0), Hash::zero()),
232 |a, b| if b.1 > a.1 { b } else { a },
233 )
234 })
235 .await?;
236 Ok(result)
237 }
238
239 async fn generate(&self, label: &str, pow_min: Pow, data_hash: Hash) -> anyhow::Result<(Salt, Pow, Hash)> {
240 generate_loop(self, &self.tracker, label, pow_min, data_hash).await
241 }
242
243 fn active_jobs(&self) -> Vec<PowJobStatus> {
244 self.tracker.lock().unwrap().snapshot()
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use crate::tools::parallel_pow_generator::{ParallelPowGenerator, StubParallelPowGenerator};
251 use crate::tools::pow::pow_compute_data_hash;
252 use crate::tools::tools;
253 use crate::tools::types::Pow;
254
255 #[tokio::test]
256 async fn stub_generates_valid_pow() -> anyhow::Result<()> {
257 const POW_MIN: Pow = Pow(12);
258 let mut data = [0u8; 64];
259 tools::random_fill_bytes(&mut data);
260 let data_hash = pow_compute_data_hash(&[&data]);
261 let generator = StubParallelPowGenerator::new();
262 let (_, pow, _) = generator.generate("test", POW_MIN, data_hash).await?;
263 assert!(pow >= POW_MIN);
264 Ok(())
265 }
266
267 #[cfg(not(target_arch = "wasm32"))]
268 #[tokio::test]
269 async fn native_generates_valid_pow() -> anyhow::Result<()> {
270 const POW_MIN: Pow = Pow(12);
271 let mut data = [0u8; 64];
272 tools::random_fill_bytes(&mut data);
273 let data_hash = pow_compute_data_hash(&[&data]);
274 let generator = crate::tools::parallel_pow_generator::NativeParallelPowGenerator::new();
275 let (_, pow, _) = generator.generate("test", POW_MIN, data_hash).await?;
276 assert!(pow >= POW_MIN);
277 Ok(())
278 }
279}