hashiverse_lib/client/timeline/
multiple_timeline.rs1use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
17use crate::client::timeline::recent_posts_pen::RecentPostsPen;
18use crate::client::timeline::single_timeline::SingleTimeline;
19use crate::tools::time::{DurationMillis, MILLIS_IN_HOUR, TimeMillis};
20use crate::tools::types::Id;
21use log::trace;
22use std::sync::Arc;
23use bytes::Bytes;
24use tokio::sync::RwLock;
25use crate::tools::buckets::{BucketLocation, BucketType};
26
27pub struct MultipleTimeline {
28 bucket_type: BucketType,
29 base_ids: Vec<Id>,
30 single_timelines: Vec<SingleTimeline>,
31}
32
33impl MultipleTimeline {
34 pub fn new(bucket_type: BucketType, base_ids: Vec<Id>, post_bundle_manager: Arc<dyn PostBundleManager>, recent_posts_pen: Arc<RwLock<RecentPostsPen>>) -> Self {
35 let single_timelines = base_ids.iter().map(|base_id| SingleTimeline::new(bucket_type, &base_id, post_bundle_manager.clone(), recent_posts_pen.clone())).collect();
36 Self { bucket_type, base_ids, single_timelines }
37 }
38
39 pub fn base_ids(&self) -> &Vec<Id> {
40 &self.base_ids
41 }
42
43 pub fn bucket_type(&self) -> BucketType {
44 self.bucket_type
45 }
46
47 pub async fn get_more_posts(&mut self, time_millis: TimeMillis, max_posts: usize, max_posts_per_single_timeline: usize, bucket_durations: &[DurationMillis]) -> anyhow::Result<Vec<(BucketLocation, Bytes)>> {
48 let mut encoded_posts_aggregated = Vec::new();
49
50 self.single_timelines.sort_by_key(|single_timeline| {
52 let date_penalty = time_millis - single_timeline.oldest_processed_post_bundle_time_millis();
53 let post_penalty = MILLIS_IN_HOUR.const_mul(single_timeline.post_count() as i64);
54 date_penalty + post_penalty
55 });
56
57 for single_timeline in &mut self.single_timelines {
58 trace!("Fetching posts from SingleTimeline with base_id={}", single_timeline.base_id());
59 let posts = single_timeline.get_more_posts(time_millis, max_posts_per_single_timeline, bucket_durations).await?;
60
61 encoded_posts_aggregated.extend(posts);
62
63 if encoded_posts_aggregated.len() >= max_posts {
64 break;
65 }
66 }
67
68 Ok(encoded_posts_aggregated)
69 }
70
71 pub fn oldest_processed_post_bundle_time_millis(&self) -> TimeMillis {
72 self.single_timelines
73 .iter()
74 .map(|single_timeline| single_timeline.oldest_processed_post_bundle_time_millis())
75 .min()
76 .unwrap_or(TimeMillis::MAX)
77 }
78}
79
80#[cfg(test)]
81pub mod tests {
82 use crate::client::post_bundle::stub_post_bundle_manager::StubPostBundleManager;
83 use crate::client::timeline::multiple_timeline::MultipleTimeline;
84 use crate::client::timeline::recent_posts_pen::RecentPostsPen;
85 use crate::tools::buckets::{BucketType, BUCKET_DURATIONS};
86 use crate::tools::time::{MILLIS_IN_MONTH, MILLIS_IN_WEEK, TimeMillis};
87 use crate::tools::types::Id;
88 use log::info;
89 use std::sync::Arc;
90 use tokio::sync::RwLock;
91
92 fn empty_pen() -> Arc<RwLock<RecentPostsPen>> {
93 Arc::new(RwLock::new(RecentPostsPen::new()))
94 }
95
96 #[tokio::test]
97 async fn timeline_single_test() -> anyhow::Result<()> {
98 let ids = vec![Id::from_slice(&[0x11; 32])?, Id::from_slice(&[0x22; 32])?, Id::from_slice(&[0x33; 32])?];
101
102 let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
103 {
104 stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "5M", 29)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "5M", 23)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "5M", 27)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "4M", 25)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "4M", 21)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "4M", 28)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "3M", 6)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "3M", 9)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "3M", 7)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "2M", 35)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "2M", 9)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "2M", 11)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_WEEK, "2M1W", 8)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "1M", 3)?; }
127
128 let mut multiple_timeline = MultipleTimeline::new(BucketType::User, ids, stub_post_bundle_manager, empty_pen());
129
130 let bucket_durations_starting_at_monthly = &BUCKET_DURATIONS[1..];
132
133 {
135 info!("1)--- FETCH -----------------------");
136 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
137 assert_eq!(29, posts.len());
138 info!("--- FETCH -----------------------");
139 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
140 assert_eq!(23, posts.len());
141 info!("--- FETCH -----------------------");
142 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
143 assert_eq!(27, posts.len());
144 }
145
146 {
148 info!("2)--- FETCH -----------------------");
149 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
150 assert_eq!(21, posts.len());
151 info!("--- FETCH -----------------------");
152 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
153 assert_eq!(28, posts.len());
154 info!("--- FETCH -----------------------");
155 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
156 assert_eq!(25, posts.len());
157 }
158
159 {
161 info!("3)--- FETCH -----------------------");
162 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
163 assert_eq!(9 + 6 + 7, posts.len());
164 }
165
166 {
167 info!("4)--- FETCH -----------------------");
168 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
169 assert_eq!(28, posts.len());
170 }
171
172 {
174 info!("5) --- FETCH -----------------------");
175 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
176 assert_eq!(35, posts.len());
177
178 info!("6) --- FETCH -----------------------");
179 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
180 assert_eq!(3, posts.len());
181
182 info!("--- FETCH -----------------------");
183 let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
184 assert_eq!(0, posts.len());
185 }
186
187 Ok(())
188 }
189
190 #[tokio::test]
191 async fn oldest_processed_post_bundle_time_millis_is_min_of_single_timelines() -> anyhow::Result<()> {
192 let ids = vec![Id::from_slice(&[0x11; 32])?, Id::from_slice(&[0x22; 32])?];
193
194 let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
195 stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "3M", 5)?;
197 stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "2M", 5)?;
198
199 let mut multiple_timeline = MultipleTimeline::new(BucketType::User, ids, stub_post_bundle_manager, empty_pen());
200 let bucket_durations_starting_at_monthly = &BUCKET_DURATIONS[1..];
201
202 assert_eq!(multiple_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::MAX);
204
205 let _posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("4M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
207 let oldest_after_first_fetch = multiple_timeline.oldest_processed_post_bundle_time_millis();
208
209 let _posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("4M")?, 20, 5, &bucket_durations_starting_at_monthly).await?;
211 let oldest_after_second_fetch = multiple_timeline.oldest_processed_post_bundle_time_millis();
212
213 assert!(oldest_after_second_fetch <= oldest_after_first_fetch);
215
216 Ok(())
217 }
218}