Skip to main content

hashiverse_lib/client/meta_post/
meta_post_manager.rs

1//! # Meta-post lifecycle orchestration
2//!
3//! The glue between [`crate::client::meta_post::meta_post`] (data model) and the rest of
4//! the client. `MetaPostManager`:
5//!
6//! - loads the current profile from `BUCKET_CONFIG` on startup,
7//! - merges incoming meta-post updates (from other devices or from the network) via the
8//!   per-field `VersionedField` CRDT rule,
9//! - writes updates back to storage, and
10//! - re-publishes the account's meta-post to the network if the current month's User
11//!   bucket no longer contains a fresh enough copy — so following clients and other
12//!   devices can discover current settings without every profile edit triggering an
13//!   immediate post.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use log::{info, warn};
18use crate::client::client_storage::client_storage::{self as cs, ClientStorage, BUCKET_META_POST_PUBLIC, BUCKET_CONFIG, BUCKET_CONFIG_KEY_META_POST_V1_PUBLIC, BUCKET_CONFIG_KEY_META_POST_V1_PRIVATE};
19use crate::client::key_locker::key_locker::KeyLocker;
20use crate::client::meta_post::meta_post::{MetaPost, MetaPostPrivateV1, MetaPostPublicV1, MetaPostV1, VersionedField, merge_public, merge_private};
21use crate::client::meta_post::meta_post_crypto;
22use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
23use crate::protocol::posting::encoded_post::EncodedPostV1;
24use crate::tools::buckets::{bucket_durations_for_type, generate_bucket_location, BucketType};
25use crate::tools::client_id::ClientId;
26use crate::tools::json;
27use crate::tools::runtime_services::RuntimeServices;
28use crate::tools::time::TimeMillis;
29use crate::tools::types::{Id, Salt};
30
31/// Owns all of a user's config state — profile bio, follows, content thresholds, skip-warnings,
32/// etc. — backed by a `MetaPostV1` envelope that can be published to the network.
33///
34/// ## Storage
35///
36/// Public and private config are stored as two separate keys in `BUCKET_CONFIG`:
37/// - `{client_id}.meta_post_v1_public`  → `MetaPostPublicV1`
38/// - `{client_id}.meta_post_v1_private` → `MetaPostPrivateV1`
39///
40/// `MetaPostV1` is used strictly as the network envelope — it is assembled from the two
41/// stored sections at publish time and disassembled on receive. Because every field carries
42/// its own `TimeMillis` timestamp, concurrent edits from multiple devices merge cleanly
43/// via last-writer-wins.
44///
45/// ## Publishing policy
46///
47/// Local config changes (follows, thresholds, skip_warnings, bio) are written to
48/// `BUCKET_CONFIG` immediately but are **not** published to the network on every change —
49/// that would spam the network whenever a user toggles follows or adjusts sliders. A
50/// `MetaPostV1` is published to the network in exactly two situations:
51///
52/// 1. **Explicit publish** — the user saves their bio in the UI, which calls
53///    `build_meta_post_json()` followed by `submit_post()` on the client. This bundles the
54///    latest config (including any pending follow/threshold changes) into the post.
55/// 2. **Auto-publish on startup** — `should_auto_publish()` checks whether the current
56///    month's User bucket already contains a `MetaPostV1` from this client. If not (and the
57///    bucket isn't full), the client publishes one so other users and the user's own
58///    devices can always find an up-to-date config within a reasonable time window.
59pub struct MetaPostManager {
60    runtime_services: Arc<RuntimeServices>,
61    client_storage: Arc<dyn ClientStorage>,
62    key_locker: Arc<dyn KeyLocker>,
63    client_id: ClientId,
64}
65
66impl MetaPostManager {
67    pub fn new(
68        runtime_services: Arc<RuntimeServices>,
69        client_storage: Arc<dyn ClientStorage>,
70        key_locker: Arc<dyn KeyLocker>,
71        client_id: ClientId,
72    ) -> Self {
73        Self { runtime_services, client_storage, key_locker, client_id }
74    }
75
76    fn now(&self) -> TimeMillis {
77        self.runtime_services.time_provider.current_time_millis()
78    }
79
80    fn public_config_key(&self) -> String {
81        cs::config_key_for_user(self.client_id.id, BUCKET_CONFIG_KEY_META_POST_V1_PUBLIC)
82    }
83
84    fn private_config_key(&self) -> String {
85        cs::config_key_for_user(self.client_id.id, BUCKET_CONFIG_KEY_META_POST_V1_PRIVATE)
86    }
87
88    // ------------------------------------------------------------------
89    // Config read/write
90    // ------------------------------------------------------------------
91
92    pub async fn get_public(&self) -> anyhow::Result<MetaPostPublicV1> {
93        let public = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_CONFIG, &self.public_config_key(), TimeMillis::zero()).await?;
94        Ok(public.unwrap_or_else(MetaPostPublicV1::empty))
95    }
96
97    async fn put_public(&self, public: &MetaPostPublicV1) -> anyhow::Result<()> {
98        cs::put_struct(self.client_storage.as_ref(), BUCKET_CONFIG, &self.public_config_key(), public, TimeMillis::zero()).await
99    }
100
101    pub async fn get_private(&self) -> anyhow::Result<MetaPostPrivateV1> {
102        let private = cs::get_struct::<MetaPostPrivateV1>(self.client_storage.as_ref(), BUCKET_CONFIG, &self.private_config_key(), TimeMillis::zero()).await?;
103        Ok(private.unwrap_or_else(MetaPostPrivateV1::empty))
104    }
105
106    async fn put_private(&self, private: &MetaPostPrivateV1) -> anyhow::Result<()> {
107        cs::put_struct(self.client_storage.as_ref(), BUCKET_CONFIG, &self.private_config_key(), private, TimeMillis::zero()).await
108    }
109
110    // ------------------------------------------------------------------
111    // Bio (public section)
112    // ------------------------------------------------------------------
113
114    /// Update the public bio fields in the local config.
115    /// Does NOT automatically publish to the network.
116    pub async fn set_bio(&self, nickname: String, status: String, selfie: String, avatar: String) -> anyhow::Result<()> {
117        info!("set_bio: nickname={}, status={}, selfie={}, avatar={}", nickname, status, selfie, avatar);
118
119        let time_millis = self.now();
120        let local_public = self.get_public().await?;
121        let incoming_public = MetaPostPublicV1::from_bio(time_millis, nickname, status, selfie, avatar);
122        let merged_public = merge_public(&local_public, &incoming_public);
123
124        // Store in our client_storage::BUCKET_CONFIG
125        self.put_public(&merged_public).await?;
126
127        // Store in our client_storage::BUCKET_META_POST_PUBLIC as if we already received this from the network so local get_meta_post_public() works immediately
128        cs::put_struct(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &self.client_id.id.to_hex_str(), &merged_public, time_millis).await?;
129
130        Ok(())
131    }
132
133    pub async fn get_meta_post_public(&self, id: Id) -> anyhow::Result<Option<MetaPostPublicV1>> {
134        let meta_post_public = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &id.to_hex_str(), self.now()).await?;
135        Ok(meta_post_public)
136    }
137
138    pub async fn get_all_meta_post_publics(&self) -> anyhow::Result<Vec<(String, MetaPostPublicV1)>> {
139        let mut meta_post_publics = Vec::new();
140        let keys = self.client_storage.keys(BUCKET_META_POST_PUBLIC).await?;
141        for key in keys {
142            let result: anyhow::Result<()> = try {
143                if let Some(meta_post_public) = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &key, TimeMillis::zero()).await? {
144                    meta_post_publics.push((key.clone(), meta_post_public));
145                }
146            };
147            if let Err(e) = result {
148                warn!("Failed to decode MetaPostPublicV1 with key {}: {}", key, e);
149            }
150        }
151        Ok(meta_post_publics)
152    }
153
154    // ------------------------------------------------------------------
155    // Publishing
156    // ------------------------------------------------------------------
157
158    /// Build the MetaPostV1 JSON string ready for `submit_post()`.
159    /// Also updates the local BUCKET_META_POST_PUBLIC cache.
160    pub async fn build_meta_post_json(&self) -> anyhow::Result<String> {
161        info!("build_meta_post_json");
162
163        let public = self.get_public().await?;
164        let private = self.get_private().await?;
165        let salt = Salt::random();
166        let private_encrypted = meta_post_crypto::encrypt_private_section(self.key_locker.as_ref(), &salt, &private).await?;
167
168        let meta_post = MetaPostV1::new(
169            self.client_id.id_hex(),
170            salt,
171            public.clone(),
172            private_encrypted,
173        );
174
175        // Update local meta post public cache
176        cs::put_struct(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &self.client_id.id.to_hex_str(), &public, self.now()).await?;
177
178        json::struct_to_string(&meta_post)
179    }
180
181    /// Check whether the current month's User bucket already contains a
182    /// MetaPostV1 from this client.  Returns `true` if we should publish.
183    pub async fn should_auto_publish(&self, post_bundle_manager: &dyn PostBundleManager) -> anyhow::Result<bool> {
184        let timestamp = self.now();
185
186        let bucket_durations = bucket_durations_for_type(BucketType::User);
187        let bucket_duration = bucket_durations.first().ok_or_else(|| anyhow::anyhow!("No bucket durations for User type"))?;
188        let bucket_location = generate_bucket_location(BucketType::User, self.client_id.id, *bucket_duration, timestamp)?;
189
190        let post_bundle = post_bundle_manager.get_post_bundle(&bucket_location, timestamp).await?;
191
192        if post_bundle.header.overflowed || post_bundle.header.sealed {
193            info!("Current bucket is full/sealed, skipping auto-publish of MetaPostV1");
194            return Ok(false);
195        }
196
197        let mut offset = 0;
198        for i in 0..(post_bundle.header.num_posts as usize) {
199            let len = post_bundle.header.encoded_post_lengths[i];
200            let post_bytes = post_bundle.encoded_posts_bytes.slice(offset..offset + len);
201            offset += len;
202
203            let decode_result = EncodedPostV1::decode_from_bytes(post_bytes, &bucket_location.base_id, true, true);
204            if let Ok(encoded_post) = decode_result {
205                if let Ok(post_client_id) = encoded_post.header.client_id() {
206                    if post_client_id.id == self.client_id.id {
207                        if let Ok(MetaPost::MetaPostV1(_)) = MetaPost::try_parse_meta_post(&encoded_post.post) {
208                            info!("MetaPostV1 already exists in current bucket, skipping auto-publish");
209                            return Ok(false);
210                        }
211                    }
212                }
213            }
214        }
215
216        info!("No MetaPostV1 found in most-recent, most-granular bucket, should auto-publish");
217        Ok(true)
218    }
219
220    // ------------------------------------------------------------------
221    // Receiving
222    // ------------------------------------------------------------------
223
224    /// Process an incoming MetaPostV1 from the network.
225    /// Caches the public section for any user; decrypts and merges the private section
226    /// if the post is from our own client_id.
227    pub async fn process_incoming_meta_post(&self, meta_post_v1: &MetaPostV1, post_client_id: &ClientId) -> anyhow::Result<()> {
228        let id = post_client_id.id.to_hex_str();
229        let now = self.now();
230
231        // Always cache the public section for any user
232        let incoming_timestamp = meta_post_v1.public.max_timestamp();
233        let existing_timestamp = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &id, now).await?.map(|b| b.max_timestamp());
234        if existing_timestamp.is_none_or(|t| incoming_timestamp > t) {
235            cs::put_struct(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &id, &meta_post_v1.public, now).await?;
236        }
237
238        // If this is our own post, decrypt and merge both sections
239        if post_client_id.id == self.client_id.id {
240            // Merge public
241            let local_public = self.get_public().await?;
242            let merged_public = merge_public(&local_public, &meta_post_v1.public);
243            self.put_public(&merged_public).await?;
244
245            // Merge private
246            match meta_post_crypto::decrypt_private_section(self.key_locker.as_ref(), &meta_post_v1.encryption_salt, &meta_post_v1.private_encrypted).await {
247                Ok(incoming_private) => {
248                    let local_private = self.get_private().await?;
249                    let merged_private = merge_private(&local_private, &incoming_private, now);
250                    self.put_private(&merged_private).await?;
251                }
252                Err(e) => {
253                    warn!("Failed to decrypt own MetaPostV1 private section: {}", e);
254                }
255            }
256        }
257
258        Ok(())
259    }
260
261    // ------------------------------------------------------------------
262    // Followed users
263    // ------------------------------------------------------------------
264
265    pub async fn get_followed_client_ids(&self) -> anyhow::Result<Vec<Id>> {
266        let private = self.get_private().await?;
267        let client_ids: Vec<Id> = private.followed_client_ids.iter()
268            .filter(|(_key, field)| field.value == Some(true))
269            .map(|(key, _field)| Id::from_hex_str(key))
270            .collect::<anyhow::Result<Vec<_>>>()?;
271        Ok(client_ids)
272    }
273
274    pub async fn set_followed_client_ids(&self, client_ids: Vec<Id>) -> anyhow::Result<()> {
275        let timestamp = self.now();
276        let mut private = self.get_private().await?;
277
278        for (_key, field) in private.followed_client_ids.iter_mut() {
279            if field.value == Some(true) {
280                *field = VersionedField::tombstone(timestamp);
281            }
282        }
283        for client_id in &client_ids {
284            private.followed_client_ids.insert(client_id.to_hex_str(), VersionedField::new(true, timestamp));
285        }
286
287        self.put_private(&private).await
288    }
289
290    pub async fn set_followed_client_id(&self, client_id: Id, is_followed: bool) -> anyhow::Result<()> {
291        let timestamp = self.now();
292        let mut private = self.get_private().await?;
293
294        let field = if is_followed {
295            VersionedField::new(true, timestamp)
296        } else {
297            VersionedField::tombstone(timestamp)
298        };
299        private.followed_client_ids.insert(client_id.to_hex_str(), field);
300
301        self.put_private(&private).await
302    }
303
304    // ------------------------------------------------------------------
305    // Followed hashtags
306    // ------------------------------------------------------------------
307
308    pub async fn get_followed_hashtags(&self) -> anyhow::Result<Vec<String>> {
309        let private = self.get_private().await?;
310        let hashtags: Vec<String> = private.followed_hashtags.iter()
311            .filter(|(_key, field)| field.value == Some(true))
312            .map(|(key, _field)| key.clone())
313            .collect();
314        Ok(hashtags)
315    }
316
317    pub async fn set_followed_hashtags(&self, hashtags: Vec<String>) -> anyhow::Result<()> {
318        let timestamp = self.now();
319        let mut private = self.get_private().await?;
320
321        for (_key, field) in private.followed_hashtags.iter_mut() {
322            if field.value == Some(true) {
323                *field = VersionedField::tombstone(timestamp);
324            }
325        }
326        for hashtag in &hashtags {
327            private.followed_hashtags.insert(hashtag.clone(), VersionedField::new(true, timestamp));
328        }
329
330        self.put_private(&private).await
331    }
332
333    pub async fn set_followed_hashtag(&self, hashtag: String, is_followed: bool) -> anyhow::Result<()> {
334        let timestamp = self.now();
335        let mut private = self.get_private().await?;
336
337        let field = if is_followed {
338            VersionedField::new(true, timestamp)
339        } else {
340            VersionedField::tombstone(timestamp)
341        };
342        private.followed_hashtags.insert(hashtag, field);
343
344        self.put_private(&private).await
345    }
346
347    // ------------------------------------------------------------------
348    // Content thresholds
349    // ------------------------------------------------------------------
350
351    pub async fn get_content_thresholds(&self) -> anyhow::Result<HashMap<u8, u32>> {
352        let private = self.get_private().await?;
353        let thresholds: HashMap<u8, u32> = private.content_thresholds.iter()
354            .filter_map(|(key, field)| {
355                let threshold = field.value?;
356                Some((*key, threshold))
357            })
358            .collect();
359        Ok(thresholds)
360    }
361
362    pub async fn set_content_threshold(&self, feedback_type: u8, threshold: u32) -> anyhow::Result<()> {
363        let timestamp = self.now();
364        let mut private = self.get_private().await?;
365        private.content_thresholds.insert(feedback_type, VersionedField::new(threshold, timestamp));
366        self.put_private(&private).await
367    }
368
369    pub async fn set_content_thresholds(&self, thresholds: HashMap<u8, u32>) -> anyhow::Result<()> {
370        let timestamp = self.now();
371        let mut private = self.get_private().await?;
372        for (feedback_type, threshold) in thresholds {
373            private.content_thresholds.insert(feedback_type, VersionedField::new(threshold, timestamp));
374        }
375        self.put_private(&private).await
376    }
377
378    // ------------------------------------------------------------------
379    // Skip warnings for followed
380    // ------------------------------------------------------------------
381
382    pub async fn get_skip_warnings_for_followed(&self) -> anyhow::Result<bool> {
383        let private = self.get_private().await?;
384        Ok(private.skip_warnings_for_followed.value.unwrap_or(false))
385    }
386
387    pub async fn set_skip_warnings_for_followed(&self, value: bool) -> anyhow::Result<()> {
388        let timestamp = self.now();
389        let mut private = self.get_private().await?;
390        private.skip_warnings_for_followed = VersionedField::new(value, timestamp);
391        self.put_private(&private).await
392    }
393}