Skip to main content

hashiverse_lib/client/timeline/
multiple_timeline.rs

1//! # Merging multiple timelines into one feed
2//!
3//! `MultipleTimeline` composes several
4//! [`crate::client::timeline::single_timeline::SingleTimeline`] cursors into one
5//! chronologically merged stream. Used for:
6//!
7//! - "following" feeds (one `SingleTimeline` per followed user),
8//! - "trending hashtags" (one `SingleTimeline` per hashtag),
9//! - any other feed that fans out over several `(BucketType, base_id)` sources.
10//!
11//! Each call to `get_more_posts` sorts the component timelines by age + post-count
12//! penalty (so slow / empty sub-timelines don't hog quota), pulls a quota of posts from
13//! each, and then merges the results by timestamp. Deduplication and the
14//! recent-posts-pen integration are delegated to the sub-timelines.
15
16use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
17use crate::client::timeline::recent_posts_pen::RecentPostsPen;
18use crate::client::timeline::single_timeline::SingleTimeline;
19use crate::tools::time::{DurationMillis, MILLIS_IN_HOUR, TimeMillis};
20use crate::tools::types::Id;
21use log::trace;
22use std::sync::Arc;
23use bytes::Bytes;
24use tokio::sync::RwLock;
25use crate::tools::buckets::{BucketLocation, BucketType};
26
27pub struct MultipleTimeline {
28    bucket_type: BucketType,
29    base_ids: Vec<Id>,
30    single_timelines: Vec<SingleTimeline>,
31}
32
33impl MultipleTimeline {
34    pub fn new(bucket_type: BucketType, base_ids: Vec<Id>, post_bundle_manager: Arc<dyn PostBundleManager>, recent_posts_pen: Arc<RwLock<RecentPostsPen>>) -> Self {
35        let single_timelines = base_ids.iter().map(|base_id| SingleTimeline::new(bucket_type, &base_id, post_bundle_manager.clone(), recent_posts_pen.clone())).collect();
36        Self { bucket_type, base_ids, single_timelines }
37    }
38
39    pub fn base_ids(&self) -> &Vec<Id> {
40        &self.base_ids
41    }
42
43    pub fn bucket_type(&self) -> BucketType {
44        self.bucket_type
45    }
46
47    pub async fn get_more_posts(&mut self, time_millis: TimeMillis, max_posts: usize, max_posts_per_single_timeline: usize, bucket_durations: &[DurationMillis]) -> anyhow::Result<Vec<(BucketLocation, Bytes)>> {
48        let mut encoded_posts_aggregated = Vec::new();
49
50        // Work out the priority of the SingleTimelines to poll
51        self.single_timelines.sort_by_key(|single_timeline| {
52            let date_penalty = time_millis - single_timeline.oldest_processed_post_bundle_time_millis();
53            let post_penalty = MILLIS_IN_HOUR.const_mul(single_timeline.post_count() as i64);
54            date_penalty + post_penalty
55        });
56
57        for single_timeline in &mut self.single_timelines {
58            trace!("Fetching posts from SingleTimeline with base_id={}", single_timeline.base_id());
59            let posts = single_timeline.get_more_posts(time_millis, max_posts_per_single_timeline, bucket_durations).await?;
60
61            encoded_posts_aggregated.extend(posts);
62
63            if encoded_posts_aggregated.len() >= max_posts {
64                break;
65            }
66        }
67
68        Ok(encoded_posts_aggregated)
69    }
70
71    pub fn oldest_processed_post_bundle_time_millis(&self) -> TimeMillis {
72        self.single_timelines
73            .iter()
74            .map(|single_timeline| single_timeline.oldest_processed_post_bundle_time_millis())
75            .min()
76            .unwrap_or(TimeMillis::MAX)
77    }
78}
79
80#[cfg(test)]
81pub mod tests {
82    use crate::client::post_bundle::stub_post_bundle_manager::StubPostBundleManager;
83    use crate::client::timeline::multiple_timeline::MultipleTimeline;
84    use crate::client::timeline::recent_posts_pen::RecentPostsPen;
85    use crate::tools::buckets::{BucketType, BUCKET_DURATIONS};
86    use crate::tools::time::{MILLIS_IN_MONTH, MILLIS_IN_WEEK, TimeMillis};
87    use crate::tools::types::Id;
88    use log::info;
89    use std::sync::Arc;
90    use tokio::sync::RwLock;
91
92    fn empty_pen() -> Arc<RwLock<RecentPostsPen>> {
93        Arc::new(RwLock::new(RecentPostsPen::new()))
94    }
95
96    #[tokio::test]
97    async fn timeline_single_test() -> anyhow::Result<()> {
98        // configure_logging();
99
100        let ids = vec![Id::from_slice(&[0x11; 32])?, Id::from_slice(&[0x22; 32])?, Id::from_slice(&[0x33; 32])?];
101
102        let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
103        {
104            // Fetch 1
105            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "5M", 29)?; // Fetch 1.1
106            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "5M", 23)?; // Fetch 1.2
107            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "5M", 27)?; // Fetch 1.3
108
109            // Fetch 2
110            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "4M", 25)?; // Fetch 2.3 - Accumulative 29
111            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "4M", 21)?; // Fetch 2.1 - Accumulative 23
112            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "4M", 28)?; // Fetch 2.2 - Accumulative 27
113
114            // Fetch 3
115            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "3M", 6)?; // Fetch 3.2 - Accumulative 54
116            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "3M", 9)?; // Fetch 3.1 - Accumulative 44
117            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "3M", 7)?; // Fetch 3.3 - Accumulative 55
118
119            // Fetch 4
120            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "2M", 35)?; // Fetch 5.1 - Accumulative 68
121            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "2M", 9)?; // Fetch 4.1 - Accumulative 54
122            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "2M", 11)?; // Fetch 4.3 - Accumulative 62
123            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_WEEK, "2M1W", 8)?; // Fetch 4.2 - Accumulative 60
124
125            stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "1M", 3)?; // Fetch 6
126        }
127
128        let mut multiple_timeline = MultipleTimeline::new(BucketType::User, ids, stub_post_bundle_manager, empty_pen());
129
130        // The stubs have been set up around buckets that have months as the most granular...
131        let bucket_durations_starting_at_monthly = &BUCKET_DURATIONS[1..];
132
133        // Check the ordering of the initial ties
134        {
135            info!("1)--- FETCH -----------------------");
136            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
137            assert_eq!(29, posts.len());
138            info!("--- FETCH -----------------------");
139            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
140            assert_eq!(23, posts.len());
141            info!("--- FETCH -----------------------");
142            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
143            assert_eq!(27, posts.len());
144        }
145
146        // Check the ordering of the "num posts" penalty
147        {
148            info!("2)--- FETCH -----------------------");
149            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
150            assert_eq!(21, posts.len());
151            info!("--- FETCH -----------------------");
152            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
153            assert_eq!(28, posts.len());
154            info!("--- FETCH -----------------------");
155            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
156            assert_eq!(25, posts.len());
157        }
158
159        // Check the next "batch" that will need to be aggregated to reach enough posts for the batch
160        {
161            info!("3)--- FETCH -----------------------");
162            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
163            assert_eq!(9 + 6 + 7, posts.len());
164        }
165
166        {
167            info!("4)--- FETCH -----------------------");
168            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
169            assert_eq!(28, posts.len());
170        }
171
172        // Then the rump posts
173        {
174            info!("5) --- FETCH -----------------------");
175            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
176            assert_eq!(35, posts.len());
177
178            info!("6) --- FETCH -----------------------");
179            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
180            assert_eq!(3, posts.len());
181
182            info!("--- FETCH -----------------------");
183            let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
184            assert_eq!(0, posts.len());
185        }
186
187        Ok(())
188    }
189
190    #[tokio::test]
191    async fn oldest_processed_post_bundle_time_millis_is_min_of_single_timelines() -> anyhow::Result<()> {
192        let ids = vec![Id::from_slice(&[0x11; 32])?, Id::from_slice(&[0x22; 32])?];
193
194        let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
195        // id[0] has posts at 3M, id[1] has posts at 2M — so after fetching, id[1] will have the older processed time
196        stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "3M", 5)?;
197        stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "2M", 5)?;
198
199        let mut multiple_timeline = MultipleTimeline::new(BucketType::User, ids, stub_post_bundle_manager, empty_pen());
200        let bucket_durations_starting_at_monthly = &BUCKET_DURATIONS[1..];
201
202        // Before any fetches, should be TimeMillis::MAX
203        assert_eq!(multiple_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::MAX);
204
205        // First fetch — processes id[0] at 3M
206        let _posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("4M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
207        let oldest_after_first_fetch = multiple_timeline.oldest_processed_post_bundle_time_millis();
208
209        // Second fetch — processes id[1] at 2M (older)
210        let _posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("4M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
211        let oldest_after_second_fetch = multiple_timeline.oldest_processed_post_bundle_time_millis();
212
213        // The oldest should have moved further back after processing id[1]
214        assert!(oldest_after_second_fetch <= oldest_after_first_fetch);
215
216        Ok(())
217    }
218}