Skip to main content

hashiverse_lib/client/timeline/
single_timeline.rs

1//! # A single stateful timeline cursor
2//!
3//! `SingleTimeline` walks one feed (a specific user's posts, a specific hashtag, the
4//! mentions of a given client, …) backwards through time, yielding pages of posts via
5//! `get_more_posts()`. It owns:
6//!
7//! - a [`crate::client::timeline::recursive_bucket_visitor::RecursiveBucketVisitor`]
8//!   that walks the hierarchical bucket ladder for the timeline's `(BucketType, base_id)`;
9//! - a `post_ids_already_seen` set that deduplicates across overlapping buckets and
10//!   across pages (important for Sequel / ReplyToPost timelines where posts can surface
11//!   from multiple places);
12//! - a [`crate::client::timeline::recent_posts_pen::RecentPostsPen`] reference so the
13//!   user's own very recent posts appear instantly instead of waiting for the next
14//!   DHT read.
15//!
16//! Timelines never reach an end — the walk just keeps stepping further back in time on
17//! demand (a hard-earned invariant: manually triggered "fetch more" must always produce
18//! *something*, even if it has to go a year further back to find it).
19
20use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
21use crate::client::timeline::recent_posts_pen::RecentPostsPen;
22use crate::client::timeline::recursive_bucket_visitor::{RecursiveBucketVisitor, RecursiveBucketVisitorCloseCallbackResult, RecursiveBucketVisitorOpenCallbackResult};
23use crate::tools::buckets::{BucketLocation, BucketType};
24use crate::tools::time::{DurationMillis, TimeMillis, MILLIS_IN_MONTH};
25use crate::tools::types::Id;
26use bytes::Bytes;
27use std::collections::HashSet;
28use std::sync::Arc;
29use tokio::sync::RwLock;
30
31/// A cursor that walks one feed backward through time, yielding encoded posts page by page.
32///
33/// Every feed in hashiverse — a user's profile, a hashtag, a mention stream, a reply thread
34/// — is represented on the wire as a sequence of time-bucketed post bundles keyed by a
35/// single [`Id`] (the "base id" of the feed) and a [`BucketType`]. `SingleTimeline` is the
36/// client-side state machine that reads such a feed: given a wall-clock `time_millis` and a
37/// desired page size, each call to `get_more_posts` walks to the next-older bucket via
38/// [`RecursiveBucketVisitor`], pulls the bundle through a [`PostBundleManager`], and emits
39/// the posts in reverse chronological order while deduplicating against `post_ids_already_seen`.
40///
41/// It also reads from the shared [`RecentPostsPen`] so posts the local user has just
42/// authored appear at the top of the feed before they have propagated through the network.
43/// For aggregated feeds (following multiple people, multiple hashtags) see `MultipleTimeline`,
44/// which composes several `SingleTimeline`s.
45pub struct SingleTimeline {
46    bucket_type: BucketType,
47    base_id: Id,
48    post_bundle_manager: Arc<dyn PostBundleManager>,
49    recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
50    oldest_allowed_post_bundle_time_millis: TimeMillis,
51    oldest_processed_post_bundle_time_millis: TimeMillis,
52    post_ids_already_seen: HashSet<Id>,
53}
54
55impl SingleTimeline {
56    pub fn new(bucket_type: BucketType, location_id: &Id, post_bundle_manager: Arc<dyn PostBundleManager>, recent_posts_pen: Arc<RwLock<RecentPostsPen>>) -> Self {
57        Self {
58            bucket_type,
59            base_id: *location_id,
60            post_bundle_manager,
61            recent_posts_pen,
62            oldest_allowed_post_bundle_time_millis: TimeMillis::MAX,
63            oldest_processed_post_bundle_time_millis: TimeMillis::MAX,
64            post_ids_already_seen: HashSet::new(),
65        }
66    }
67
68    pub fn bucket_type(&self) -> BucketType {
69        self.bucket_type
70    }
71
72    pub fn base_id(&self) -> Id {
73        self.base_id
74    }
75
76    pub async fn get_more_posts(&mut self, time_millis: TimeMillis, max_posts: usize, bucket_durations: &[DurationMillis]) -> anyhow::Result<Vec<(BucketLocation, Bytes)>> {
77        let mut encoded_posts = Vec::new();
78
79        if TimeMillis::MAX == self.oldest_allowed_post_bundle_time_millis {
80            self.oldest_allowed_post_bundle_time_millis = time_millis;
81        }
82        if TimeMillis::MAX == self.oldest_processed_post_bundle_time_millis {
83            self.oldest_processed_post_bundle_time_millis = time_millis;
84        }
85
86        let time_millis_max = time_millis;
87        let time_millis_min = self.oldest_allowed_post_bundle_time_millis - MILLIS_IN_MONTH;
88
89        RecursiveBucketVisitor::visit(
90            time_millis_max,
91            time_millis_min,
92            bucket_durations,
93            // on_bucket_open
94            &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
95                // log::info!(
96                //     "open {}: bucket_time_millis: {}, bucket_duration_millis: {}, time_millis_min: {}",
97                //     self.base_id, bucket_time_millis, bucket_duration_millis, time_millis_min
98                // );
99
100                // Keep pushing back the oldest time we have queried - so that future calls are allowed to go back even further than that
101                self.oldest_allowed_post_bundle_time_millis = self.oldest_allowed_post_bundle_time_millis.min(bucket_time_millis);
102
103                // Get location's postbundle
104                let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
105                let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
106
107                // Do we keep going deeper?
108                match encoded_post_bundle.header.overflowed {
109                    true => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren),
110                    false => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithoutChildren),
111                }
112            },
113            // on_bucket_close
114            &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
115                // log::info!("close {}: bucket_time_millis: {}, bucket_duration_millis: {}", self.base_id, bucket_time_millis, bucket_duration_millis);
116
117                // Keep pushing back the oldest time we have processed
118                self.oldest_processed_post_bundle_time_millis = self.oldest_processed_post_bundle_time_millis.min(bucket_time_millis);
119
120                // Get location's postbundle
121                let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
122                let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
123
124                // Extract all the posts we have not seen before (some post bundles may add posts over time if they are not yet sealed)
125                let mut extraction_start_i = 0;
126                for i in 0..(encoded_post_bundle.header.num_posts as usize) {
127                    let extraction_end_i = extraction_start_i + encoded_post_bundle.header.encoded_post_lengths[i];
128                    if self.post_ids_already_seen.insert(encoded_post_bundle.header.encoded_post_ids[i]) {
129                        let encoded_post = encoded_post_bundle.encoded_posts_bytes.slice(extraction_start_i..extraction_end_i);
130                        encoded_posts.push((bucket_location.clone(), encoded_post));
131                    }
132                    extraction_start_i = extraction_end_i;
133                }
134
135                // Do we have enough posts this round?
136                match encoded_posts.len() < max_posts {
137                    true => Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
138                    false => Ok(RecursiveBucketVisitorCloseCallbackResult::Stop),
139                }
140            },
141        )
142        .await?;
143
144        // Consult the recent posts pen for any posts we submitted that haven't yet appeared from the network
145        let pen_posts = self.recent_posts_pen.write().await.get_matching_posts(self.bucket_type, &self.base_id, &self.post_ids_already_seen, time_millis);
146        for (bucket_location, encoded_post_bytes, post_id) in pen_posts {
147            if self.post_ids_already_seen.insert(post_id) {
148                encoded_posts.push((bucket_location, encoded_post_bytes));
149            }
150        }
151
152        Ok(encoded_posts)
153    }
154
155    pub fn post_count(&self) -> usize {
156        self.post_ids_already_seen.len()
157    }
158
159    pub fn oldest_allowed_post_bundle_time_millis(&self) -> TimeMillis {
160        self.oldest_allowed_post_bundle_time_millis
161    }
162
163    pub fn oldest_processed_post_bundle_time_millis(&self) -> TimeMillis {
164        self.oldest_processed_post_bundle_time_millis
165    }
166}
167
168#[cfg(test)]
169pub mod tests {
170    use crate::client::post_bundle::stub_post_bundle_manager::StubPostBundleManager;
171    use crate::client::timeline::recent_posts_pen::RecentPostsPen;
172    use crate::client::timeline::single_timeline::SingleTimeline;
173    use crate::tools::buckets::{BucketType, BUCKET_DURATIONS};
174    use crate::tools::time::{TimeMillis, MILLIS_IN_DAY, MILLIS_IN_MONTH, MILLIS_IN_WEEK};
175    use crate::tools::types::Id;
176    use log::info;
177    use std::sync::Arc;
178    use tokio::sync::RwLock;
179
180    fn empty_pen() -> Arc<RwLock<RecentPostsPen>> {
181        Arc::new(RwLock::new(RecentPostsPen::new()))
182    }
183
184    #[tokio::test]
185    async fn timeline_single_test() -> anyhow::Result<()> {
186        // configure_logging();
187
188        let id = Id::random();
189        let stub_post_bundle_manager = StubPostBundleManager::default();
190        {
191            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
192        }
193
194        let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
195        let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
196        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 5, &BUCKET_DURATIONS[1..]).await?;
197        assert_eq!(posts.len(), 23);
198        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
199        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
200
201        Ok(())
202    }
203
204    #[tokio::test]
205    async fn timeline_multiple_test() -> anyhow::Result<()> {
206        // configure_logging();
207
208        let id = Id::random();
209        let stub_post_bundle_manager = StubPostBundleManager::default();
210        {
211            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 27)?;
212            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1W", 17)?;
213            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 6)?;
214            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 22)?;
215        }
216
217        let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
218        let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
219        info!("--- FETCH 1 -----------------------");
220        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
221        assert_eq!(posts.len(), 22);
222        assert_eq!(single_timeline.post_count(), 22);
223        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
224        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("3W")?);
225        info!("--- FETCH 2 -----------------------");
226        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
227        assert_eq!(posts.len(), 23);
228        assert_eq!(single_timeline.post_count(), 22 + 23);
229        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
230        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1W")?);
231        info!("--- FETCH 3 -----------------------");
232        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
233        assert_eq!(posts.len(), 27);
234        assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
235        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
236        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
237        info!("--- FETCH 4 -----------------------");
238        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
239        assert_eq!(posts.len(), 0);
240        assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
241        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
242        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
243
244        Ok(())
245    }
246
247    #[tokio::test]
248    async fn timeline_multiple_months_test() -> anyhow::Result<()> {
249        // configure_logging();
250
251        let id = Id::random();
252        let stub_post_bundle_manager = StubPostBundleManager::default();
253        {
254            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
255            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 5)?;
256            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 16)?;
257            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1M1D", 29)?;
258            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M2W", 21)?;
259            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_DAY, "1M2W", 7)?;
260            stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M3W", 6)?;
261        }
262
263        let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
264        let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
265        info!("--- FETCH 1 -----------------------");
266        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
267        assert_eq!(posts.len(), 34);
268        assert_eq!(single_timeline.post_count(), 34);
269        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
270        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M2W")?);
271        info!("--- FETCH 2 -----------------------");
272        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
273        assert_eq!(posts.len(), 29);
274        assert_eq!(single_timeline.post_count(), 34 + 29);
275        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
276        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
277        info!("--- FETCH 3 -----------------------");
278        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
279        assert_eq!(posts.len(), 21);
280        assert_eq!(single_timeline.post_count(), 34 + 29 + 21);
281        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
282        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("2W")?);
283        info!("--- FETCH 4 -----------------------");
284        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
285        assert_eq!(posts.len(), 23);
286        assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
287        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
288        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
289        info!("--- FETCH 5 -----------------------");
290        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
291        assert_eq!(posts.len(), 0);
292        assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
293        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
294        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
295        info!("--- FETCH 6 -----------------------");
296        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
297        assert_eq!(posts.len(), 0);
298        assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
299        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
300        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
301
302        Ok(())
303    }
304
305    #[tokio::test]
306    async fn timeline_bundle_update_test() -> anyhow::Result<()> {
307        // configure_logging();
308
309        // Note that each time we do the request, the "distance back" it has searched should be increasing by 1 month because we never actually get 20 posts...
310
311        let id = Id::random();
312        let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
313        let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager.clone(), empty_pen());
314
315        info!("--- FETCH -----------------------");
316        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
317        assert_eq!(posts.len(), 0);
318        assert_eq!(single_timeline.post_count(), 0);
319        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
320        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
321
322        let mut post_bundle = stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 12)?;
323
324        info!("--- FETCH -----------------------");
325        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
326        assert_eq!(posts.len(), 12);
327        assert_eq!(single_timeline.post_count(), 12);
328        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
329        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
330
331        // Add a post to the old bundle - as if someone has posted since we last checked
332        post_bundle.header.num_posts += 1;
333        post_bundle.header.encoded_post_ids.push(Id::random());
334        post_bundle.header.encoded_post_lengths.push(0);
335        stub_post_bundle_manager.add_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", &post_bundle)?;
336
337        info!("--- FETCH -----------------------");
338        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
339        assert_eq!(posts.len(), 1);
340        assert_eq!(single_timeline.post_count(), 13);
341        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
342        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
343
344        info!("--- FETCH -----------------------");
345        let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
346        assert_eq!(posts.len(), 0);
347        assert_eq!(single_timeline.post_count(), 13);
348        assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
349        assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
350
351        Ok(())
352    }
353}