1use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
21use crate::client::timeline::recent_posts_pen::RecentPostsPen;
22use crate::client::timeline::recursive_bucket_visitor::{RecursiveBucketVisitor, RecursiveBucketVisitorCloseCallbackResult, RecursiveBucketVisitorOpenCallbackResult};
23use crate::tools::buckets::{BucketLocation, BucketType};
24use crate::tools::time::{DurationMillis, TimeMillis, MILLIS_IN_MONTH};
25use crate::tools::types::Id;
26use bytes::Bytes;
27use std::collections::HashSet;
28use std::sync::Arc;
29use tokio::sync::RwLock;
30
31pub struct SingleTimeline {
46 bucket_type: BucketType,
47 base_id: Id,
48 post_bundle_manager: Arc<dyn PostBundleManager>,
49 recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
50 oldest_allowed_post_bundle_time_millis: TimeMillis,
51 oldest_processed_post_bundle_time_millis: TimeMillis,
52 post_ids_already_seen: HashSet<Id>,
53}
54
55impl SingleTimeline {
56 pub fn new(bucket_type: BucketType, location_id: &Id, post_bundle_manager: Arc<dyn PostBundleManager>, recent_posts_pen: Arc<RwLock<RecentPostsPen>>) -> Self {
57 Self {
58 bucket_type,
59 base_id: *location_id,
60 post_bundle_manager,
61 recent_posts_pen,
62 oldest_allowed_post_bundle_time_millis: TimeMillis::MAX,
63 oldest_processed_post_bundle_time_millis: TimeMillis::MAX,
64 post_ids_already_seen: HashSet::new(),
65 }
66 }
67
68 pub fn bucket_type(&self) -> BucketType {
69 self.bucket_type
70 }
71
72 pub fn base_id(&self) -> Id {
73 self.base_id
74 }
75
76 pub async fn get_more_posts(&mut self, time_millis: TimeMillis, max_posts: usize, bucket_durations: &[DurationMillis]) -> anyhow::Result<Vec<(BucketLocation, Bytes)>> {
77 let mut encoded_posts = Vec::new();
78
79 if TimeMillis::MAX == self.oldest_allowed_post_bundle_time_millis {
80 self.oldest_allowed_post_bundle_time_millis = time_millis;
81 }
82 if TimeMillis::MAX == self.oldest_processed_post_bundle_time_millis {
83 self.oldest_processed_post_bundle_time_millis = time_millis;
84 }
85
86 let time_millis_max = time_millis;
87 let time_millis_min = self.oldest_allowed_post_bundle_time_millis - MILLIS_IN_MONTH;
88
89 RecursiveBucketVisitor::visit(
90 time_millis_max,
91 time_millis_min,
92 bucket_durations,
93 &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
95 self.oldest_allowed_post_bundle_time_millis = self.oldest_allowed_post_bundle_time_millis.min(bucket_time_millis);
102
103 let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
105 let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
106
107 match encoded_post_bundle.header.overflowed {
109 true => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren),
110 false => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithoutChildren),
111 }
112 },
113 &mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
115 self.oldest_processed_post_bundle_time_millis = self.oldest_processed_post_bundle_time_millis.min(bucket_time_millis);
119
120 let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
122 let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
123
124 let mut extraction_start_i = 0;
126 for i in 0..(encoded_post_bundle.header.num_posts as usize) {
127 let extraction_end_i = extraction_start_i + encoded_post_bundle.header.encoded_post_lengths[i];
128 if self.post_ids_already_seen.insert(encoded_post_bundle.header.encoded_post_ids[i]) {
129 let encoded_post = encoded_post_bundle.encoded_posts_bytes.slice(extraction_start_i..extraction_end_i);
130 encoded_posts.push((bucket_location.clone(), encoded_post));
131 }
132 extraction_start_i = extraction_end_i;
133 }
134
135 match encoded_posts.len() < max_posts {
137 true => Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
138 false => Ok(RecursiveBucketVisitorCloseCallbackResult::Stop),
139 }
140 },
141 )
142 .await?;
143
144 let pen_posts = self.recent_posts_pen.write().await.get_matching_posts(self.bucket_type, &self.base_id, &self.post_ids_already_seen, time_millis);
146 for (bucket_location, encoded_post_bytes, post_id) in pen_posts {
147 if self.post_ids_already_seen.insert(post_id) {
148 encoded_posts.push((bucket_location, encoded_post_bytes));
149 }
150 }
151
152 Ok(encoded_posts)
153 }
154
155 pub fn post_count(&self) -> usize {
156 self.post_ids_already_seen.len()
157 }
158
159 pub fn oldest_allowed_post_bundle_time_millis(&self) -> TimeMillis {
160 self.oldest_allowed_post_bundle_time_millis
161 }
162
163 pub fn oldest_processed_post_bundle_time_millis(&self) -> TimeMillis {
164 self.oldest_processed_post_bundle_time_millis
165 }
166}
167
168#[cfg(test)]
169pub mod tests {
170 use crate::client::post_bundle::stub_post_bundle_manager::StubPostBundleManager;
171 use crate::client::timeline::recent_posts_pen::RecentPostsPen;
172 use crate::client::timeline::single_timeline::SingleTimeline;
173 use crate::tools::buckets::{BucketType, BUCKET_DURATIONS};
174 use crate::tools::time::{TimeMillis, MILLIS_IN_DAY, MILLIS_IN_MONTH, MILLIS_IN_WEEK};
175 use crate::tools::types::Id;
176 use log::info;
177 use std::sync::Arc;
178 use tokio::sync::RwLock;
179
180 fn empty_pen() -> Arc<RwLock<RecentPostsPen>> {
181 Arc::new(RwLock::new(RecentPostsPen::new()))
182 }
183
184 #[tokio::test]
185 async fn timeline_single_test() -> anyhow::Result<()> {
186 let id = Id::random();
189 let stub_post_bundle_manager = StubPostBundleManager::default();
190 {
191 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
192 }
193
194 let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
195 let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
196 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 5, &BUCKET_DURATIONS[1..]).await?;
197 assert_eq!(posts.len(), 23);
198 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
199 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
200
201 Ok(())
202 }
203
204 #[tokio::test]
205 async fn timeline_multiple_test() -> anyhow::Result<()> {
206 let id = Id::random();
209 let stub_post_bundle_manager = StubPostBundleManager::default();
210 {
211 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 27)?;
212 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1W", 17)?;
213 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 6)?;
214 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 22)?;
215 }
216
217 let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
218 let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
219 info!("--- FETCH 1 -----------------------");
220 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
221 assert_eq!(posts.len(), 22);
222 assert_eq!(single_timeline.post_count(), 22);
223 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
224 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("3W")?);
225 info!("--- FETCH 2 -----------------------");
226 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
227 assert_eq!(posts.len(), 23);
228 assert_eq!(single_timeline.post_count(), 22 + 23);
229 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
230 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1W")?);
231 info!("--- FETCH 3 -----------------------");
232 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
233 assert_eq!(posts.len(), 27);
234 assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
235 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
236 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
237 info!("--- FETCH 4 -----------------------");
238 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
239 assert_eq!(posts.len(), 0);
240 assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
241 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
242 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
243
244 Ok(())
245 }
246
247 #[tokio::test]
248 async fn timeline_multiple_months_test() -> anyhow::Result<()> {
249 let id = Id::random();
252 let stub_post_bundle_manager = StubPostBundleManager::default();
253 {
254 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
255 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 5)?;
256 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 16)?;
257 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1M1D", 29)?;
258 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M2W", 21)?;
259 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_DAY, "1M2W", 7)?;
260 stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M3W", 6)?;
261 }
262
263 let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
264 let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
265 info!("--- FETCH 1 -----------------------");
266 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
267 assert_eq!(posts.len(), 34);
268 assert_eq!(single_timeline.post_count(), 34);
269 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
270 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M2W")?);
271 info!("--- FETCH 2 -----------------------");
272 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
273 assert_eq!(posts.len(), 29);
274 assert_eq!(single_timeline.post_count(), 34 + 29);
275 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
276 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
277 info!("--- FETCH 3 -----------------------");
278 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
279 assert_eq!(posts.len(), 21);
280 assert_eq!(single_timeline.post_count(), 34 + 29 + 21);
281 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
282 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("2W")?);
283 info!("--- FETCH 4 -----------------------");
284 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
285 assert_eq!(posts.len(), 23);
286 assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
287 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
288 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
289 info!("--- FETCH 5 -----------------------");
290 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
291 assert_eq!(posts.len(), 0);
292 assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
293 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
294 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
295 info!("--- FETCH 6 -----------------------");
296 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
297 assert_eq!(posts.len(), 0);
298 assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
299 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
300 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
301
302 Ok(())
303 }
304
305 #[tokio::test]
306 async fn timeline_bundle_update_test() -> anyhow::Result<()> {
307 let id = Id::random();
312 let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
313 let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager.clone(), empty_pen());
314
315 info!("--- FETCH -----------------------");
316 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
317 assert_eq!(posts.len(), 0);
318 assert_eq!(single_timeline.post_count(), 0);
319 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
320 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
321
322 let mut post_bundle = stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 12)?;
323
324 info!("--- FETCH -----------------------");
325 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
326 assert_eq!(posts.len(), 12);
327 assert_eq!(single_timeline.post_count(), 12);
328 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
329 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
330
331 post_bundle.header.num_posts += 1;
333 post_bundle.header.encoded_post_ids.push(Id::random());
334 post_bundle.header.encoded_post_lengths.push(0);
335 stub_post_bundle_manager.add_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", &post_bundle)?;
336
337 info!("--- FETCH -----------------------");
338 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
339 assert_eq!(posts.len(), 1);
340 assert_eq!(single_timeline.post_count(), 13);
341 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
342 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
343
344 info!("--- FETCH -----------------------");
345 let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
346 assert_eq!(posts.len(), 0);
347 assert_eq!(single_timeline.post_count(), 13);
348 assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
349 assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
350
351 Ok(())
352 }
353}