Skip to main content

hashiverse_lib/client/timeline/
recursive_bucket_visitor.rs

1//! # Depth-first walk over the hierarchical bucket ladder
2//!
3//! The pure, storage-free algorithm behind every timeline. Given a time range and the
4//! bucket-duration sequence from [`crate::tools::buckets::BUCKET_DURATIONS`],
5//! `RecursiveBucketVisitor` visits buckets in depth-first order — coarse granularity
6//! first, drilling into finer granularity where the open callback asks for it.
7//!
8//! Two callbacks steer the walk:
9//!
10//! - **Open** — called before descending into a bucket. Returns
11//!   `ContinueWithChildren`, `ContinueWithoutChildren` (skip this subtree — empty
12//!   buckets), or `Stop` (abort the whole walk — enough posts collected).
13//! - **Close** — called when leaving a bucket, so timelines can decide whether to
14//!   back off to coarser granularity or continue stepping within the current level.
15//!
16//! The visitor has zero knowledge of posts, caches, networks, or deduplication — it
17//! is a plain tree walker; everything else is implemented on top of it by
18//! [`crate::client::timeline::single_timeline::SingleTimeline`].
19
20use crate::tools::buckets::BucketLocation;
21use crate::tools::time::{DurationMillis, TimeMillis};
22
23pub enum RecursiveBucketVisitorOpenCallbackResult {
24    ContinueWithChildren,
25    ContinueWithoutChildren,
26    Stop,
27}
28
29pub enum RecursiveBucketVisitorCloseCallbackResult {
30    Continue,
31    Stop,
32}
33
34pub struct RecursiveBucketVisitor {}
35
36impl RecursiveBucketVisitor {
37    pub async fn visit<FOpen, FClose>(time_millis_max_exclusive: TimeMillis, time_millis_min: TimeMillis, bucket_durations: &[DurationMillis], on_bucket_open: &mut FOpen, on_bucket_close: &mut FClose) -> anyhow::Result<bool>
38    where
39        FOpen: AsyncFnMut(TimeMillis, DurationMillis) -> anyhow::Result<RecursiveBucketVisitorOpenCallbackResult>,
40        FClose: AsyncFnMut(TimeMillis, DurationMillis) -> anyhow::Result<RecursiveBucketVisitorCloseCallbackResult>,
41    {
42        struct FrameOpen {
43            bucket_time_millis: TimeMillis,
44            bucket_duration_i: usize,
45            oldest_bucket_time_millis_allowed: TimeMillis,
46        }
47
48        struct FrameClose {
49            bucket_time_millis: TimeMillis,
50            bucket_duration_i: usize,
51        }
52
53        enum Frame {
54            Open(FrameOpen),
55            Close(FrameClose),
56        }
57
58        anyhow::ensure!(!bucket_durations.is_empty(), "bucket_durations must have at least one element");
59
60        let mut frames = Vec::new();
61
62        // Prime the stack
63        {
64            let next_bucket_duration_i = 0;
65            let next_bucket_duration = bucket_durations[next_bucket_duration_i];
66
67            let next_latest_bucket_time_millis = BucketLocation::round_down_to_bucket_start(time_millis_max_exclusive, next_bucket_duration);
68            let next_oldest_bucket_time_millis_allowed = BucketLocation::round_down_to_bucket_start(time_millis_min, next_bucket_duration);
69
70            frames.push(Frame::Open(FrameOpen {
71                bucket_time_millis: next_latest_bucket_time_millis,
72                bucket_duration_i: next_bucket_duration_i,
73                oldest_bucket_time_millis_allowed: next_oldest_bucket_time_millis_allowed,
74            }));
75        }
76
77        // Pump the stack
78        while let Some(frame) = frames.pop() {
79            match frame {
80                Frame::Close(FrameClose { bucket_time_millis, bucket_duration_i }) => {
81                    let bucket_duration = bucket_durations[bucket_duration_i];
82                    let callback_result = on_bucket_close(bucket_time_millis, bucket_duration).await?;
83                    if let RecursiveBucketVisitorCloseCallbackResult::Stop = callback_result {
84                        return Ok(false);
85                    };
86                }
87
88                Frame::Open(FrameOpen {
89                    bucket_time_millis,
90                    bucket_duration_i,
91                    oldest_bucket_time_millis_allowed,
92                }) => {
93                    let bucket_duration = bucket_durations[bucket_duration_i];
94
95                    // Push our next sibling onto the stack
96                    {
97                        let next_bucket_duration_i = bucket_duration_i;
98                        let next_bucket_time_millis = bucket_time_millis - bucket_duration;
99                        let next_oldest_bucket_time_millis_allowed = oldest_bucket_time_millis_allowed;
100
101                        if next_bucket_time_millis >= oldest_bucket_time_millis_allowed {
102                            frames.push(Frame::Open(FrameOpen {
103                                bucket_time_millis: next_bucket_time_millis,
104                                bucket_duration_i: next_bucket_duration_i,
105                                oldest_bucket_time_millis_allowed: next_oldest_bucket_time_millis_allowed,
106                            }));
107                        }
108                    }
109
110                    // Is this bucket too recent to be allowed? (sometimes they request exactly the start of the next bucket - e.g. 1M - we dont want to include that month)
111                    if bucket_time_millis >= time_millis_max_exclusive {
112                        continue;
113                    }
114
115                    let callback_result = on_bucket_open(bucket_time_millis, bucket_duration).await?;
116
117                    // If the user has elected to bail early
118                    if let RecursiveBucketVisitorOpenCallbackResult::Stop = callback_result {
119                        return Ok(false);
120                    };
121
122                    // Push our close onto the stack
123                    frames.push(Frame::Close(FrameClose { bucket_time_millis, bucket_duration_i }));
124
125                    // If the user wants to go more granular
126                    if let RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren = callback_result {
127                        let next_bucket_duration_i = bucket_duration_i + 1;
128                        if next_bucket_duration_i < bucket_durations.len() {
129                            let next_bucket_duration = bucket_durations[next_bucket_duration_i];
130                            let time_millis_max_bucket = BucketLocation::round_down_to_bucket_start(time_millis_max_exclusive, next_bucket_duration);
131
132                            let next_bucket_time_millis = time_millis_max_bucket.min(bucket_time_millis + bucket_duration - next_bucket_duration);
133                            let next_oldest_bucket_time_millis_allowed = bucket_time_millis;
134
135                            frames.push(Frame::Open(FrameOpen {
136                                bucket_time_millis: next_bucket_time_millis,
137                                bucket_duration_i: next_bucket_duration_i,
138                                oldest_bucket_time_millis_allowed: next_oldest_bucket_time_millis_allowed,
139                            }));
140                        }
141                    }
142                }
143            }
144        }
145
146        Ok(true)
147    }
148}
149
150#[cfg(test)]
151pub mod tests {
152    use crate::client::timeline::recursive_bucket_visitor::{RecursiveBucketVisitor, RecursiveBucketVisitorCloseCallbackResult, RecursiveBucketVisitorOpenCallbackResult};
153    use crate::tools::buckets::BUCKET_DURATIONS;
154    use crate::tools::time::{DurationMillis, MILLIS_IN_DAY, MILLIS_IN_MILLISECOND, MILLIS_IN_MONTH, MILLIS_IN_WEEK, TimeMillis};
155    use log::info;
156
157    #[tokio::test]
158    async fn visitor_test() -> anyhow::Result<()> {
159        // configure_logging();
160
161        {
162            info!("Just at a new month (so only one month in view)");
163            let mut total_buckets = 0;
164
165            RecursiveBucketVisitor::visit(
166                TimeMillis::zero() + MILLIS_IN_MONTH,
167                TimeMillis::zero(),
168                &BUCKET_DURATIONS[1..3],
169                &mut async |time_millis, duration_millis| {
170                    info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
171                    total_buckets += 1;
172                    Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
173                },
174                &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
175            )
176            .await?;
177
178            assert_eq!(total_buckets, 5);
179        }
180
181        {
182            info!("Just after a new month (one entire month and the beginning of one month in view)");
183            let mut total_buckets = 0;
184
185            RecursiveBucketVisitor::visit(
186                TimeMillis::zero() + MILLIS_IN_MONTH + MILLIS_IN_MILLISECOND,
187                TimeMillis::zero(),
188                &BUCKET_DURATIONS[1..3],
189                &mut async |time_millis: TimeMillis, duration_millis: DurationMillis| {
190                    info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
191                    total_buckets += 1;
192                    Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
193                },
194                &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
195            )
196            .await?;
197
198            assert_eq!(total_buckets, 7);
199        }
200
201        {
202            info!("Just at a new month plus a 2 weeks plus 3 days");
203            let mut total_buckets = 0;
204
205            RecursiveBucketVisitor::visit(
206                TimeMillis::zero() + MILLIS_IN_MONTH + MILLIS_IN_WEEK.const_mul(2) + MILLIS_IN_DAY.const_mul(3),
207                TimeMillis::zero(),
208                &BUCKET_DURATIONS[1..4],
209                &mut async |time_millis, duration_millis| {
210                    info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
211                    total_buckets += 1;
212                    Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
213                },
214                &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
215            )
216            .await?;
217
218            assert_eq!(total_buckets, 54);
219        }
220
221        Ok(())
222    }
223
224    #[tokio::test]
225    async fn visitor_granular_test() -> anyhow::Result<()> {
226        // configure_logging();
227
228        {
229            info!("Just before the beginning of a new month (so only one month in view) - but mega granular");
230            let mut total_buckets = 0;
231
232            RecursiveBucketVisitor::visit(
233                TimeMillis::zero() + MILLIS_IN_MONTH,
234                TimeMillis::zero(),
235                &BUCKET_DURATIONS[1..],
236                &mut async |time_millis: TimeMillis, duration_millis: DurationMillis| {
237                    info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
238                    total_buckets += 1;
239                    Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
240                },
241                &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
242            )
243            .await?;
244
245            let total_expected = 1 + 4 * (1 + 7 * (1 + 4 * (1 + 6 * (1 + 4 * (1 + 3 * (1 + 5))))));
246            assert_eq!(total_buckets, total_expected);
247        }
248
249        Ok(())
250    }
251
252    #[tokio::test]
253    async fn visitor_granular_with_recurse_sibling_test() -> anyhow::Result<()> {
254        // configure_logging();
255
256        {
257            info!("Limit the depth we want to go based on a condition");
258            let mut total_buckets = 0;
259
260            RecursiveBucketVisitor::visit(
261                TimeMillis::zero() + MILLIS_IN_MONTH,
262                TimeMillis::zero(),
263                &BUCKET_DURATIONS[1..],
264                &mut async |time_millis: TimeMillis, duration_millis: DurationMillis| {
265                    info!("time_millis: {}, duration_millis: {}", time_millis, duration_millis);
266                    total_buckets += 1;
267
268                    if duration_millis > MILLIS_IN_DAY {
269                        Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
270                    }
271                    else {
272                        Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithoutChildren)
273                    }
274                },
275                &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
276            )
277            .await?;
278
279            let total_expected = 1 + 4 * (1 + 7);
280            assert_eq!(total_buckets, total_expected);
281        }
282
283        Ok(())
284    }
285
286    #[tokio::test]
287    async fn visitor_granular_with_abort_test() -> anyhow::Result<()> {
288        // configure_logging();
289
290        {
291            info!("Abort after a few visits");
292            let mut total_buckets = 0;
293
294            RecursiveBucketVisitor::visit(
295                TimeMillis::zero() + MILLIS_IN_MONTH,
296                TimeMillis::zero(),
297                &BUCKET_DURATIONS[1..],
298                &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
299                    info!("bucket_time_millis: {}, duration_millis: {}", bucket_time_millis, bucket_duration_millis);
300                    total_buckets += 1;
301
302                    if total_buckets >= 9 {
303                        Ok(RecursiveBucketVisitorOpenCallbackResult::Stop)
304                    }
305                    else {
306                        Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren)
307                    }
308                },
309                &mut async |_time_millis: TimeMillis, _duration_millis: DurationMillis| Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
310            )
311            .await?;
312
313            assert_eq!(total_buckets, 9);
314        }
315
316        Ok(())
317    }
318}