hashiverse_lib/client/timeline/
recursive_bucket_visitor.rs1use crate::tools::buckets::BucketLocation;
21use crate::tools::time::{DurationMillis, TimeMillis};
22
23pub enum RecursiveBucketVisitorOpenCallbackResult {
24 ContinueWithChildren,
25 ContinueWithoutChildren,
26 Stop,
27}
28
29pub enum RecursiveBucketVisitorCloseCallbackResult {
30 Continue,
31 Stop,
32}
33
34pub struct RecursiveBucketVisitor {}
35
36impl RecursiveBucketVisitor {
37 pub async fn visit<FOpen, FClose>(time_millis_max_exclusive: TimeMillis, time_millis_min: TimeMillis, bucket_durations: &[DurationMillis], on_bucket_open: &mut FOpen, on_bucket_close: &mut FClose) -> anyhow::Result<bool>
38 where
39 FOpen: AsyncFnMut(TimeMillis, DurationMillis) -> anyhow::Result<RecursiveBucketVisitorOpenCallbackResult>,
40 FClose: AsyncFnMut(TimeMillis, DurationMillis) -> anyhow::Result<RecursiveBucketVisitorCloseCallbackResult>,
41 {
42 struct FrameOpen {
43 bucket_time_millis: TimeMillis,
44 bucket_duration_i: usize,
45 oldest_bucket_time_millis_allowed: TimeMillis,
46 }
47
48 struct FrameClose {
49 bucket_time_millis: TimeMillis,
50 bucket_duration_i: usize,
51 }
52
53 enum Frame {
54 Open(FrameOpen),
55 Close(FrameClose),
56 }
57
58 anyhow::ensure!(!bucket_durations.is_empty(), "bucket_durations must have at least one element");
59
60 let mut frames = Vec::new();
61
62 {
64 let next_bucket_duration_i = 0;
65 let next_bucket_duration = bucket_durations[next_bucket_duration_i];
66
67 let next_latest_bucket_time_millis = BucketLocation::round_down_to_bucket_start(time_millis_max_exclusive, next_bucket_duration);
68 let next_oldest_bucket_time_millis_allowed = BucketLocation::round_down_to_bucket_start(time_millis_min, next_bucket_duration);
69
70 frames.push(Frame::Open(FrameOpen {
71 bucket_time_millis: next_latest_bucket_time_millis,
72 bucket_duration_i: next_bucket_duration_i,
73 oldest_bucket_time_millis_allowed: next_oldest_bucket_time_millis_allowed,
74 }));
75 }
76
77 while let Some(frame) = frames.pop() {
79 match frame {
80 Frame::Close(FrameClose { bucket_time_millis, bucket_duration_i }) => {
81 let bucket_duration = bucket_durations[bucket_duration_i];
82 let callback_result = on_bucket_close(bucket_time_millis, bucket_duration).await?;
83 if let RecursiveBucketVisitorCloseCallbackResult::Stop = callback_result {
84 return Ok(false);
85 };
86 }
87
88 Frame::Open(FrameOpen {
89 bucket_time_millis,
90 bucket_duration_i,
91 oldest_bucket_time_millis_allowed,
92 }) => {
93 let bucket_duration = bucket_durations[bucket_duration_i];
94
95 {
97 let next_bucket_duration_i = bucket_duration_i;
98 let next_bucket_time_millis = bucket_time_millis - bucket_duration;
99 let next_oldest_bucket_time_millis_allowed = oldest_bucket_time_millis_allowed;
100
101 if next_bucket_time_millis >= oldest_bucket_time_millis_allowed {
102 frames.push(Frame::Open(FrameOpen {
103 bucket_time_millis: next_bucket_time_millis,
104 bucket_duration_i: next_bucket_duration_i,
105 oldest_bucket_time_millis_allowed: next_oldest_bucket_time_millis_allowed,
106 }));
107 }
108 }
109
110 if bucket_time_millis >= time_millis_max_exclusive {
112 continue;
113 }
114
115 let callback_result = on_bucket_open(bucket_time_millis, bucket_duration).await?;
116
117 if let RecursiveBucketVisitorOpenCallbackResult::Stop = callback_result {
119 return Ok(false);
120 };
121
122 frames.push(Frame::Close(FrameClose { bucket_time_millis, bucket_duration_i }));
124
125 if let RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren = callback_result {
127 let next_bucket_duration_i = bucket_duration_i + 1;
128 if next_bucket_duration_i < bucket_durations.len() {
129 let next_bucket_duration = bucket_durations[next_bucket_duration_i];
130 let time_millis_max_bucket = BucketLocation::round_down_to_bucket_start(time_millis_max_exclusive, next_bucket_duration);
131
132 let next_bucket_time_millis = time_millis_max_bucket.min(bucket_time_millis + bucket_duration - next_bucket_duration);
133 let next_oldest_bucket_time_millis_allowed = bucket_time_millis;
134
135 frames.push(Frame::Open(FrameOpen {
136 bucket_time_millis: next_bucket_time_millis,
137 bucket_duration_i: next_bucket_duration_i,
138 oldest_bucket_time_millis_allowed: next_oldest_bucket_time_millis_allowed,
139 }));
140 }
141 }
142 }
143 }
144 }
145
146 Ok(true)
147 }
148}
149
150#[cfg(test)]
151pub mod tests {
152 use crate::client::timeline::recursive_bucket_visitor::{RecursiveBucketVisitor, RecursiveBucketVisitorCloseCallbackResult, RecursiveBucketVisitorOpenCallbackResult};
153 use crate::tools::buckets::BUCKET_DURATIONS;
154 use crate::tools::time::{DurationMillis, MILLIS_IN_DAY, MILLIS_IN_MILLISECOND, MILLIS_IN_MONTH, MILLIS_IN_WEEK, TimeMillis};
155 use log::info;
156
157 #[tokio::test]
158 async fn visitor_test() -> anyhow::Result<()> {
159 {
162 info!("Just at a new month (so only one month in view)");
163 let mut total_buckets = 0;
164
165 RecursiveBucketVisitor::visit(
166 TimeMillis::zero() + MILLIS_IN_MONTH,
167 TimeMillis::zero(),
168 &BUCKET_DURATIONS[1..3],
169 &mut async |time_millis, duration_millis| {
170 info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
171 total_buckets += 1;
172 Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
173 },
174 &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
175 )
176 .await?;
177
178 assert_eq!(total_buckets, 5);
179 }
180
181 {
182 info!("Just after a new month (one entire month and the beginning of one month in view)");
183 let mut total_buckets = 0;
184
185 RecursiveBucketVisitor::visit(
186 TimeMillis::zero() + MILLIS_IN_MONTH + MILLIS_IN_MILLISECOND,
187 TimeMillis::zero(),
188 &BUCKET_DURATIONS[1..3],
189 &mut async |time_millis: TimeMillis, duration_millis: DurationMillis| {
190 info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
191 total_buckets += 1;
192 Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
193 },
194 &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
195 )
196 .await?;
197
198 assert_eq!(total_buckets, 7);
199 }
200
201 {
202 info!("Just at a new month plus a 2 weeks plus 3 days");
203 let mut total_buckets = 0;
204
205 RecursiveBucketVisitor::visit(
206 TimeMillis::zero() + MILLIS_IN_MONTH + MILLIS_IN_WEEK.const_mul(2) + MILLIS_IN_DAY.const_mul(3),
207 TimeMillis::zero(),
208 &BUCKET_DURATIONS[1..4],
209 &mut async |time_millis, duration_millis| {
210 info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
211 total_buckets += 1;
212 Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
213 },
214 &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
215 )
216 .await?;
217
218 assert_eq!(total_buckets, 54);
219 }
220
221 Ok(())
222 }
223
224 #[tokio::test]
225 async fn visitor_granular_test() -> anyhow::Result<()> {
226 {
229 info!("Just before the beginning of a new month (so only one month in view) - but mega granular");
230 let mut total_buckets = 0;
231
232 RecursiveBucketVisitor::visit(
233 TimeMillis::zero() + MILLIS_IN_MONTH,
234 TimeMillis::zero(),
235 &BUCKET_DURATIONS[1..],
236 &mut async |time_millis: TimeMillis, duration_millis: DurationMillis| {
237 info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
238 total_buckets += 1;
239 Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
240 },
241 &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
242 )
243 .await?;
244
245 let total_expected = 1 + 4 * (1 + 7 * (1 + 4 * (1 + 6 * (1 + 4 * (1 + 3 * (1 + 5))))));
246 assert_eq!(total_buckets, total_expected);
247 }
248
249 Ok(())
250 }
251
252 #[tokio::test]
253 async fn visitor_granular_with_recurse_sibling_test() -> anyhow::Result<()> {
254 {
257 info!("Limit the depth we want to go based on a condition");
258 let mut total_buckets = 0;
259
260 RecursiveBucketVisitor::visit(
261 TimeMillis::zero() + MILLIS_IN_MONTH,
262 TimeMillis::zero(),
263 &BUCKET_DURATIONS[1..],
264 &mut async |time_millis: TimeMillis, duration_millis: DurationMillis| {
265 info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
266 total_buckets += 1;
267
268 if duration_millis > MILLIS_IN_DAY {
269 Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
270 }
271 else {
272 Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithoutChildren)
273 }
274 },
275 &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
276 )
277 .await?;
278
279 let total_expected = 1 + 4 * (1 + 7);
280 assert_eq!(total_buckets, total_expected);
281 }
282
283 Ok(())
284 }
285
286 #[tokio::test]
287 async fn visitor_granular_with_abort_test() -> anyhow::Result<()> {
288 {
291 info!("Abort after a few visits");
292 let mut total_buckets = 0;
293
294 RecursiveBucketVisitor::visit(
295 TimeMillis::zero() + MILLIS_IN_MONTH,
296 TimeMillis::zero(),
297 &BUCKET_DURATIONS[1..],
298 &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
299 info!("bucket_time_millis: {}, duration_millis: {}", bucket_time_millis, bucket_duration_millis);
300 total_buckets += 1;
301
302 if total_buckets >= 9 {
303 Ok(RecursiveBucketVisitorOpenCallbackResult::Stop)
304 }
305 else {
306 Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
307 }
308 },
309 &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
310 )
311 .await?;
312
313 assert_eq!(total_buckets, 9);
314 }
315
316 Ok(())
317 }
318}