hashiverse_lib/client/meta_post/
meta_post_manager.rs1use std::collections::HashMap;
16use std::sync::Arc;
17use log::{info, warn};
18use crate::client::client_storage::client_storage::{self as cs, ClientStorage, BUCKET_META_POST_PUBLIC, BUCKET_CONFIG, BUCKET_CONFIG_KEY_META_POST_V1_PUBLIC, BUCKET_CONFIG_KEY_META_POST_V1_PRIVATE};
19use crate::client::key_locker::key_locker::KeyLocker;
20use crate::client::meta_post::meta_post::{MetaPost, MetaPostPrivateV1, MetaPostPublicV1, MetaPostV1, VersionedField, merge_public, merge_private};
21use crate::client::meta_post::meta_post_crypto;
22use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
23use crate::protocol::posting::encoded_post::EncodedPostV1;
24use crate::tools::buckets::{bucket_durations_for_type, generate_bucket_location, BucketType};
25use crate::tools::client_id::ClientId;
26use crate::tools::json;
27use crate::tools::runtime_services::RuntimeServices;
28use crate::tools::time::TimeMillis;
29use crate::tools::types::{Id, Salt};
30
31pub struct MetaPostManager {
60 runtime_services: Arc<RuntimeServices>,
61 client_storage: Arc<dyn ClientStorage>,
62 key_locker: Arc<dyn KeyLocker>,
63 client_id: ClientId,
64}
65
66impl MetaPostManager {
67 pub fn new(
68 runtime_services: Arc<RuntimeServices>,
69 client_storage: Arc<dyn ClientStorage>,
70 key_locker: Arc<dyn KeyLocker>,
71 client_id: ClientId,
72 ) -> Self {
73 Self { runtime_services, client_storage, key_locker, client_id }
74 }
75
76 fn now(&self) -> TimeMillis {
77 self.runtime_services.time_provider.current_time_millis()
78 }
79
80 fn public_config_key(&self) -> String {
81 cs::config_key_for_user(self.client_id.id, BUCKET_CONFIG_KEY_META_POST_V1_PUBLIC)
82 }
83
84 fn private_config_key(&self) -> String {
85 cs::config_key_for_user(self.client_id.id, BUCKET_CONFIG_KEY_META_POST_V1_PRIVATE)
86 }
87
88 pub async fn get_public(&self) -> anyhow::Result<MetaPostPublicV1> {
93 let public = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_CONFIG, &self.public_config_key(), TimeMillis::zero()).await?;
94 Ok(public.unwrap_or_else(MetaPostPublicV1::empty))
95 }
96
97 async fn put_public(&self, public: &MetaPostPublicV1) -> anyhow::Result<()> {
98 cs::put_struct(self.client_storage.as_ref(), BUCKET_CONFIG, &self.public_config_key(), public, TimeMillis::zero()).await
99 }
100
101 pub async fn get_private(&self) -> anyhow::Result<MetaPostPrivateV1> {
102 let private = cs::get_struct::<MetaPostPrivateV1>(self.client_storage.as_ref(), BUCKET_CONFIG, &self.private_config_key(), TimeMillis::zero()).await?;
103 Ok(private.unwrap_or_else(MetaPostPrivateV1::empty))
104 }
105
106 async fn put_private(&self, private: &MetaPostPrivateV1) -> anyhow::Result<()> {
107 cs::put_struct(self.client_storage.as_ref(), BUCKET_CONFIG, &self.private_config_key(), private, TimeMillis::zero()).await
108 }
109
110 pub async fn set_bio(&self, nickname: String, status: String, selfie: String, avatar: String) -> anyhow::Result<()> {
117 info!("set_bio: nickname={}, status={}, selfie={}, avatar={}", nickname, status, selfie, avatar);
118
119 let time_millis = self.now();
120 let local_public = self.get_public().await?;
121 let incoming_public = MetaPostPublicV1::from_bio(time_millis, nickname, status, selfie, avatar);
122 let merged_public = merge_public(&local_public, &incoming_public);
123
124 self.put_public(&merged_public).await?;
126
127 cs::put_struct(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &self.client_id.id.to_hex_str(), &merged_public, time_millis).await?;
129
130 Ok(())
131 }
132
133 pub async fn get_meta_post_public(&self, id: Id) -> anyhow::Result<Option<MetaPostPublicV1>> {
134 let meta_post_public = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &id.to_hex_str(), self.now()).await?;
135 Ok(meta_post_public)
136 }
137
138 pub async fn get_all_meta_post_publics(&self) -> anyhow::Result<Vec<(String, MetaPostPublicV1)>> {
139 let mut meta_post_publics = Vec::new();
140 let keys = self.client_storage.keys(BUCKET_META_POST_PUBLIC).await?;
141 for key in keys {
142 let result: anyhow::Result<()> = try {
143 if let Some(meta_post_public) = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &key, TimeMillis::zero()).await? {
144 meta_post_publics.push((key.clone(), meta_post_public));
145 }
146 };
147 if let Err(e) = result {
148 warn!("Failed to decode MetaPostPublicV1 with key {}: {}", key, e);
149 }
150 }
151 Ok(meta_post_publics)
152 }
153
154 pub async fn build_meta_post_json(&self) -> anyhow::Result<String> {
161 info!("build_meta_post_json");
162
163 let public = self.get_public().await?;
164 let private = self.get_private().await?;
165 let salt = Salt::random();
166 let private_encrypted = meta_post_crypto::encrypt_private_section(self.key_locker.as_ref(), &salt, &private).await?;
167
168 let meta_post = MetaPostV1::new(
169 self.client_id.id_hex(),
170 salt,
171 public.clone(),
172 private_encrypted,
173 );
174
175 cs::put_struct(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &self.client_id.id.to_hex_str(), &public, self.now()).await?;
177
178 json::struct_to_string(&meta_post)
179 }
180
181 pub async fn should_auto_publish(&self, post_bundle_manager: &dyn PostBundleManager) -> anyhow::Result<bool> {
184 let timestamp = self.now();
185
186 let bucket_durations = bucket_durations_for_type(BucketType::User);
187 let bucket_duration = bucket_durations.first().ok_or_else(|| anyhow::anyhow!("No bucket durations for User type"))?;
188 let bucket_location = generate_bucket_location(BucketType::User, self.client_id.id, *bucket_duration, timestamp)?;
189
190 let post_bundle = post_bundle_manager.get_post_bundle(&bucket_location, timestamp).await?;
191
192 if post_bundle.header.overflowed || post_bundle.header.sealed {
193 info!("Current bucket is full/sealed, skipping auto-publish of MetaPostV1");
194 return Ok(false);
195 }
196
197 let mut offset = 0;
198 for i in 0..(post_bundle.header.num_posts as usize) {
199 let len = post_bundle.header.encoded_post_lengths[i];
200 let post_bytes = post_bundle.encoded_posts_bytes.slice(offset..offset + len);
201 offset += len;
202
203 let decode_result = EncodedPostV1::decode_from_bytes(post_bytes, &bucket_location.base_id, true, true);
204 if let Ok(encoded_post) = decode_result {
205 if let Ok(post_client_id) = encoded_post.header.client_id() {
206 if post_client_id.id == self.client_id.id {
207 if let Ok(MetaPost::MetaPostV1(_)) = MetaPost::try_parse_meta_post(&encoded_post.post) {
208 info!("MetaPostV1 already exists in current bucket, skipping auto-publish");
209 return Ok(false);
210 }
211 }
212 }
213 }
214 }
215
216 info!("No MetaPostV1 found in most-recent, most-granular bucket, should auto-publish");
217 Ok(true)
218 }
219
220 pub async fn process_incoming_meta_post(&self, meta_post_v1: &MetaPostV1, post_client_id: &ClientId) -> anyhow::Result<()> {
228 let id = post_client_id.id.to_hex_str();
229 let now = self.now();
230
231 let incoming_timestamp = meta_post_v1.public.max_timestamp();
233 let existing_timestamp = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &id, now).await?.map(|b| b.max_timestamp());
234 if existing_timestamp.is_none_or(|t| incoming_timestamp > t) {
235 cs::put_struct(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &id, &meta_post_v1.public, now).await?;
236 }
237
238 if post_client_id.id == self.client_id.id {
240 let local_public = self.get_public().await?;
242 let merged_public = merge_public(&local_public, &meta_post_v1.public);
243 self.put_public(&merged_public).await?;
244
245 match meta_post_crypto::decrypt_private_section(self.key_locker.as_ref(), &meta_post_v1.encryption_salt, &meta_post_v1.private_encrypted).await {
247 Ok(incoming_private) => {
248 let local_private = self.get_private().await?;
249 let merged_private = merge_private(&local_private, &incoming_private, now);
250 self.put_private(&merged_private).await?;
251 }
252 Err(e) => {
253 warn!("Failed to decrypt own MetaPostV1 private section: {}", e);
254 }
255 }
256 }
257
258 Ok(())
259 }
260
261 pub async fn get_followed_client_ids(&self) -> anyhow::Result<Vec<Id>> {
266 let private = self.get_private().await?;
267 let client_ids: Vec<Id> = private.followed_client_ids.iter()
268 .filter(|(_key, field)| field.value == Some(true))
269 .map(|(key, _field)| Id::from_hex_str(key))
270 .collect::<anyhow::Result<Vec<_>>>()?;
271 Ok(client_ids)
272 }
273
274 pub async fn set_followed_client_ids(&self, client_ids: Vec<Id>) -> anyhow::Result<()> {
275 let timestamp = self.now();
276 let mut private = self.get_private().await?;
277
278 for (_key, field) in private.followed_client_ids.iter_mut() {
279 if field.value == Some(true) {
280 *field = VersionedField::tombstone(timestamp);
281 }
282 }
283 for client_id in &client_ids {
284 private.followed_client_ids.insert(client_id.to_hex_str(), VersionedField::new(true, timestamp));
285 }
286
287 self.put_private(&private).await
288 }
289
290 pub async fn set_followed_client_id(&self, client_id: Id, is_followed: bool) -> anyhow::Result<()> {
291 let timestamp = self.now();
292 let mut private = self.get_private().await?;
293
294 let field = if is_followed {
295 VersionedField::new(true, timestamp)
296 } else {
297 VersionedField::tombstone(timestamp)
298 };
299 private.followed_client_ids.insert(client_id.to_hex_str(), field);
300
301 self.put_private(&private).await
302 }
303
304 pub async fn get_followed_hashtags(&self) -> anyhow::Result<Vec<String>> {
309 let private = self.get_private().await?;
310 let hashtags: Vec<String> = private.followed_hashtags.iter()
311 .filter(|(_key, field)| field.value == Some(true))
312 .map(|(key, _field)| key.clone())
313 .collect();
314 Ok(hashtags)
315 }
316
317 pub async fn set_followed_hashtags(&self, hashtags: Vec<String>) -> anyhow::Result<()> {
318 let timestamp = self.now();
319 let mut private = self.get_private().await?;
320
321 for (_key, field) in private.followed_hashtags.iter_mut() {
322 if field.value == Some(true) {
323 *field = VersionedField::tombstone(timestamp);
324 }
325 }
326 for hashtag in &hashtags {
327 private.followed_hashtags.insert(hashtag.clone(), VersionedField::new(true, timestamp));
328 }
329
330 self.put_private(&private).await
331 }
332
333 pub async fn set_followed_hashtag(&self, hashtag: String, is_followed: bool) -> anyhow::Result<()> {
334 let timestamp = self.now();
335 let mut private = self.get_private().await?;
336
337 let field = if is_followed {
338 VersionedField::new(true, timestamp)
339 } else {
340 VersionedField::tombstone(timestamp)
341 };
342 private.followed_hashtags.insert(hashtag, field);
343
344 self.put_private(&private).await
345 }
346
347 pub async fn get_content_thresholds(&self) -> anyhow::Result<HashMap<u8, u32>> {
352 let private = self.get_private().await?;
353 let thresholds: HashMap<u8, u32> = private.content_thresholds.iter()
354 .filter_map(|(key, field)| {
355 let threshold = field.value?;
356 Some((*key, threshold))
357 })
358 .collect();
359 Ok(thresholds)
360 }
361
362 pub async fn set_content_threshold(&self, feedback_type: u8, threshold: u32) -> anyhow::Result<()> {
363 let timestamp = self.now();
364 let mut private = self.get_private().await?;
365 private.content_thresholds.insert(feedback_type, VersionedField::new(threshold, timestamp));
366 self.put_private(&private).await
367 }
368
369 pub async fn set_content_thresholds(&self, thresholds: HashMap<u8, u32>) -> anyhow::Result<()> {
370 let timestamp = self.now();
371 let mut private = self.get_private().await?;
372 for (feedback_type, threshold) in thresholds {
373 private.content_thresholds.insert(feedback_type, VersionedField::new(threshold, timestamp));
374 }
375 self.put_private(&private).await
376 }
377
378 pub async fn get_skip_warnings_for_followed(&self) -> anyhow::Result<bool> {
383 let private = self.get_private().await?;
384 Ok(private.skip_warnings_for_followed.value.unwrap_or(false))
385 }
386
387 pub async fn set_skip_warnings_for_followed(&self, value: bool) -> anyhow::Result<()> {
388 let timestamp = self.now();
389 let mut private = self.get_private().await?;
390 private.skip_warnings_for_followed = VersionedField::new(value, timestamp);
391 self.put_private(&private).await
392 }
393}