Skip to main content

hashiverse_server_lib/server/
hashiverse_server.rs

1//! # Top-level server orchestrator
2//!
3//! [`HashiverseServer`] is the server-binary analogue of
4//! [`hashiverse_lib::client::hashiverse_client::HashiverseClient`]: the single struct
5//! that wires together every subsystem the node needs to operate and hands them to
6//! the inbound-request handler.
7//!
8//! What it owns:
9//!
10//! - **Identity** — a persisted [`hashiverse_lib::tools::server_id::ServerId`] is
11//!   loaded from the [`crate::environment::environment::Environment`] or minted
12//!   fresh with proof-of-work on first start.
13//! - **Transport** — built via a `TransportFactory` (TLS in production, plain TCP
14//!   or in-memory in tests) and bound to the port from
15//!   [`crate::server::args::Args`].
16//! - **DHT** — a [`crate::server::kademlia::kademlia::Kademlia`] populated from the
17//!   persisted peer buckets at startup and kept up to date by the handler dispatch.
18//! - **Caches** — [`crate::server::post_bundle_caching`] and
19//!   [`crate::server::post_bundle_feedback_caching`] plus per-connection reply-salt
20//!   and heal caches, all backed by `moka` with TTL/TTI eviction.
21//! - **Replay protection** — a short-window set of observed request salts so an
22//!   attacker can't replay a valid signed request back at us.
23
24use crate::environment::environment::{Environment, EnvironmentDimensions, EnvironmentFactory, CONFIG_KADEMLIA_PEER_BUCKETS, CONFIG_SERVER_ID};
25use crate::server::kademlia::kademlia;
26use crate::server::kademlia::kademlia::Kademlia;
27use crate::server::post_bundle_caching::PostBundleCache;
28use crate::server::post_bundle_feedback_caching::PostBundleFeedbackCache;
29use hashiverse_lib::anyhow_assert_eq;
30use hashiverse_lib::protocol::payload::payload::{AnnounceResponseV1, AnnounceV1, BootstrapResponseV1, BootstrapV1, PayloadRequestKind, PayloadResponseKind};
31use hashiverse_lib::protocol::peer::Peer;
32use hashiverse_lib::protocol::rpc;
33use hashiverse_lib::tools::runtime_services::RuntimeServices;
34use hashiverse_lib::tools::server_id::ServerId;
35use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_MINUTE, MILLIS_IN_SECOND};
36use hashiverse_lib::tools::time_provider::time_provider::TimeProvider;
37use hashiverse_lib::tools::types::{Id, Salt};
38use hashiverse_lib::tools::{config, tools};
39use hashiverse_lib::tools::json;
40use hashiverse_lib::transport::transport::{IncomingRequest, TransportServer};
41use log::{error, info, trace, warn};
42use moka::sync::Cache;
43use parking_lot::{Mutex, RwLock};
44use std::sync::Arc;
45use std::time::Duration;
46use bytes::Bytes;
47use tokio::sync::mpsc;
48use tokio_util::sync::CancellationToken;
49use hashiverse_lib::protocol::rpc::rpc_response::RpcResponsePacketRx;
50use hashiverse_lib::tools::hyper_log_log::HyperLogLog;
51use hashiverse_lib::protocol::payload::payload::TrendingHashtagsFetchResponseV1;
52use crate::server::args::Args;
53
54pub struct HashiverseServer {
55    pub runtime_services: Arc<RuntimeServices>,
56    pub environment: Arc<Environment>,
57    pub server_id: ServerId,
58    pub kademlia: Arc<RwLock<Kademlia<Id, Peer>>>,
59    pub transport_server: Arc<dyn TransportServer>,
60    pub peer_self: Arc<RwLock<Peer>>,
61    pub heal_in_progress: Cache<Id, ()>,
62    pub seen_salts: Cache<Salt, ()>,
63    pub post_bundle_cache: PostBundleCache,
64    pub post_bundle_feedback_cache: PostBundleFeedbackCache,
65    pub trending_hashtags: Cache<String, HyperLogLog>,
66    pub trending_hashtags_response_cache: Mutex<Option<(TimeMillis, TrendingHashtagsFetchResponseV1)>>,
67}
68
69impl HashiverseServer {
70    pub async fn new(runtime_services: Arc<RuntimeServices>, environment_factory: Arc<dyn EnvironmentFactory>, args: Args) -> anyhow::Result<Arc<Self>> {
71        let environment_dimensions = EnvironmentDimensions::default().with_max_size_bytes(args.max_post_database_size_megabytes * 1024 * 1024); 
72        let environment = environment_factory.open_next_available(environment_dimensions).await?;
73
74        // let passphrase = passphrase::get_passphrase(args.passphrase_path);
75        let config_server_id = environment.config_get_bytes(CONFIG_SERVER_ID)?;
76        let server_id = match config_server_id {
77            None => {
78                let server_id = ServerId::new(runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, args.skip_pq_commitment_bytes, runtime_services.pow_generator.as_ref()).await?;
79                environment.config_put_bytes(CONFIG_SERVER_ID, server_id.encode()?)?;
80                info!("starting new server with server_id={}", server_id);
81                server_id
82            }
83            Some(config_server_id) => {
84                let server_id = ServerId::decode(config_server_id.as_ref())?;
85                server_id.verify()?;
86                info!("restarting existing server with server_id={}", server_id);
87                server_id
88            }
89        };
90
91        let transport_server = runtime_services.transport_factory.create_server(&args.base_path, args.port, args.force_local_network).await?;
92
93        // Update the address in our Peer record
94        let mut peer_self = server_id.to_peer(runtime_services.time_provider.as_ref())?;
95        peer_self.address = transport_server.get_address().to_string();
96        peer_self.sign(runtime_services.time_provider.as_ref(), &server_id.keys.signature_key)?;
97
98        // Boto our kademlia
99        let mut kademlia = Kademlia::<Id, Peer>::new(server_id.id, config::SERVER_KADEMLIA_MAX_PEERS_PER_BUCKET);
100        {
101            let now = runtime_services.time_provider.current_time_millis();
102
103            // We always belong in our own kademlia
104            kademlia.add_peer(peer_self.clone(), now)?;
105
106            // Have we previously persisted our peer buckets?
107            {
108                let try_result = try {
109                    let peer_buckets = environment.config_get_struct::<Vec<Vec<Peer>>>(CONFIG_KADEMLIA_PEER_BUCKETS)?;
110                    if let Some(peer_buckets) = peer_buckets {
111                        for peer_bucket in peer_buckets {
112                            for peer in peer_bucket {
113                                kademlia.add_peer(peer, now)?;
114                            }
115                        }
116                    }
117                };
118
119                if let Err(e) = try_result {
120                    warn!("problem depersisting peer_buckets: {}", e);
121                }
122            }
123        }
124
125        info!("server_id={}", server_id);
126        info!("peer_self={}", peer_self);
127
128        let hashiverse_server = HashiverseServer {
129            runtime_services,
130            environment: Arc::new(environment),
131            server_id,
132            kademlia: Arc::new(RwLock::new(kademlia)),
133            transport_server,
134            peer_self: Arc::new(RwLock::new(peer_self)),
135            heal_in_progress: Cache::builder().time_to_live(Duration::from_secs(60)).build(),
136            seen_salts: Cache::builder().time_to_live(Duration::from_mins(5)).max_capacity(100_000).build(),
137            post_bundle_cache: PostBundleCache::new(config::SERVER_POST_BUNDLE_CACHE_MAX_ORIGINATORS_PER_LOCATION, config::SERVER_POST_BUNDLE_CACHE_MAX_BYTES),
138            post_bundle_feedback_cache: PostBundleFeedbackCache::new(config::SERVER_POST_BUNDLE_FEEDBACK_CACHE_MAX_BYTES),
139            trending_hashtags: Cache::builder().max_capacity(256).build(),
140            trending_hashtags_response_cache: Mutex::new(None),
141        };
142
143        Ok(Arc::new(hashiverse_server))
144    }
145
146    pub async fn run(&self, cancellation_token: CancellationToken) {
147        info!("server started");
148
149        let (tx, rx) = mpsc::channel::<IncomingRequest>(32);
150
151        let res = tokio::try_join!(
152            self.wrap_and_dispatch_network_envelopes(cancellation_token.clone(), rx),
153            self.maintain_environment(cancellation_token.clone(), self.runtime_services.time_provider.clone()),
154            self.maintain_kademlia(cancellation_token.clone(), self.runtime_services.time_provider.clone()),
155            self.transport_server.listen(cancellation_token.clone(), tx),
156        );
157
158        match res {
159            Ok(_) => info!("server stopped"),
160            Err(e) => error!("server stopped with error: {}", e),
161        }
162    }
163
164    pub async fn add_potential_peer_to_kademlia(&self, peer: Peer, time_millis: TimeMillis) {
165        // Verify this peer
166        let result = peer.verify();
167        if let Err(e) = result {
168            warn!("peer {} failed verification: {}", peer, e);
169            return;
170        }
171
172        // Has this peer done enough work?
173        if peer.pow_initial.pow < config::SERVER_KEY_POW_MIN {
174            warn!("peer {} failed pow so not adding to our kademlia", peer);
175            return;
176        }
177
178        let result = self.kademlia.write().add_peer(peer, time_millis);
179        if let Err(e) = result {
180            warn!("problem adding peer: {}", e);
181        }
182    }
183
184    async fn rpc_server_unknown(&self, address: &String, payload_request_kind: PayloadRequestKind, payload: Bytes) -> anyhow::Result<RpcResponsePacketRx> {
185        rpc::rpc::rpc_server_unknown(&self.runtime_services, &self.server_id.id, address, payload_request_kind, payload).await
186    }
187
188    async fn rpc_server_known(&self, destination_peer: &Peer, payload_request_kind: PayloadRequestKind, payload: Bytes) -> anyhow::Result<RpcResponsePacketRx> {
189        rpc::rpc::rpc_server_known(&self.runtime_services, &self.server_id.id, destination_peer, payload_request_kind, payload).await
190    }
191
192    async fn maintain_environment(&self, cancellation_token: CancellationToken, time_provider: Arc<dyn TimeProvider>) -> Result<(), anyhow::Error> {
193        loop {
194            if cancellation_token.is_cancelled() {
195                break;
196            }
197
198            let time_millis = time_provider.current_time_millis();
199
200            self.environment.do_maintenance(&cancellation_token, time_millis).await?;
201
202            tools::cancellable_sleep_millis(self.runtime_services.time_provider.as_ref(), MILLIS_IN_MINUTE, &cancellation_token).await;
203        }
204
205        Ok(())
206    }
207
208    async fn maintain_kademlia(&self, cancellation_token: CancellationToken, time_provider: Arc<dyn TimeProvider>) -> Result<(), anyhow::Error> {
209        let mut last_bootstrap = TimeMillis::zero();
210        let mut last_announce = TimeMillis::zero();
211        let mut last_peers_dump_to_storage = self.runtime_services.time_provider.current_time_millis();
212
213        loop {
214            if cancellation_token.is_cancelled() {
215                break;
216            }
217
218            let now = time_provider.current_time_millis();
219
220            // Do we need to bootstrap?
221            if now - last_bootstrap > config::MILLIS_TO_WAIT_BETWEEN_BOOTSTRAPS {
222                last_bootstrap = now;
223
224                let needs_bootstrapping = { self.kademlia.read().len() < config::MINIMUM_PEERS_TO_STOP_BOOTSTRAPPING };
225
226                if needs_bootstrapping {
227
228                    // Lets randomize these addresses so that the first one is not snowed
229                    let mut bootstrap_addresses = self.runtime_services.transport_factory.get_bootstrap_addresses().await;
230                    tools::shuffle(&mut bootstrap_addresses);
231                    trace!("bootstrap addresses: {:?}", bootstrap_addresses);
232
233                    for bootstrap_address in bootstrap_addresses {
234                        if cancellation_token.is_cancelled() {
235                            break;
236                        }
237
238                        let try_result: anyhow::Result<()> = try {
239                            {
240                                trace!("bootstrapping {}", bootstrap_address);
241                                let rpc_response_packet_rx = self.rpc_server_unknown(&bootstrap_address, PayloadRequestKind::BootstrapV1, json::struct_to_bytes(&BootstrapV1 {})?).await?;
242                                anyhow_assert_eq!(&PayloadResponseKind::BootstrapResponseV1, &rpc_response_packet_rx.response_request_kind);
243                                let response = json::bytes_to_struct::<BootstrapResponseV1>(&rpc_response_packet_rx.bytes)?;
244                                for peer in response.peers_random {
245                                    self.add_potential_peer_to_kademlia(peer, now).await;
246                                }
247                            }
248                        };
249
250                        if let Err(e) = try_result {
251                            warn!("problem bootstrapping {}: {}", bootstrap_address, e);
252                        }
253
254                        let needs_bootstrapping = { self.kademlia.read().len() < config::MINIMUM_PEERS_TO_STOP_BOOTSTRAPPING };
255                        if !needs_bootstrapping {
256                            break;
257                        }
258                        
259                    }
260
261                    trace!("We now have {} peers", self.kademlia.read().len());
262                }
263            }
264
265            // Do we need to announce?
266            if now - last_announce > config::MILLIS_TO_WAIT_BETWEEN_ANNOUNCES {
267                last_announce = now;
268
269                let peer_self = self.peer_self.read().clone();
270
271
272                // Pick some candidates who we will poke
273                let mut announce_peers = Vec::<Peer>::new();
274                {
275                    let kademlia = self.kademlia.read();
276
277                    // One that is potentially dead
278                    let peer_with_lowest_score = kademlia.get_peer_with_lowest_score();
279                    if let Some(peer_with_lowest_score) = peer_with_lowest_score {
280                        announce_peers.push(peer_with_lowest_score.clone());
281                    }
282
283                    // Someone in our vicinity so our vicinity becomes more tightly knit
284                    let (peers_nearest, _) = kademlia.get_peers_for_key(&peer_self.id, 8);
285                    if !peers_nearest.is_empty() {
286                        announce_peers.push(tools::random_element(&peers_nearest).clone());
287                    }
288                }
289
290                // Pick the peer that we havent heard from for the longest
291                for announce_peer in announce_peers {
292                    if cancellation_token.is_cancelled() {
293                        break;
294                    }
295
296                    // If we have to announce to ourself, then simply push our own freshest peer
297                    if announce_peer == peer_self {
298                        // trace!("We are own own oldest peer!");
299                        self.add_potential_peer_to_kademlia(peer_self.clone(), now).await;
300                        continue;
301                    }
302
303                    let try_result: anyhow::Result<()> = try {
304                        {
305                            // trace!("announcing ourselves to {}", announce_peer);
306
307                            let rpc_response_packet_rx = self.rpc_server_known(&announce_peer, PayloadRequestKind::AnnounceV1, json::struct_to_bytes(&AnnounceV1 { peer_self: peer_self.clone() })?).await?;
308                            anyhow_assert_eq!(&PayloadResponseKind::AnnounceResponseV1, &rpc_response_packet_rx.response_request_kind);
309                            let response = json::bytes_to_struct::<AnnounceResponseV1>(&rpc_response_packet_rx.bytes)?;
310                            self.add_potential_peer_to_kademlia(response.peer_self, now).await;
311                            for peer in response.peers_nearest {
312                                self.add_potential_peer_to_kademlia(peer, now).await;
313                            }
314                        }
315                    };
316
317                    if let Err(e) = try_result {
318                        warn!("problem announcing {}: {}", announce_peer, e);
319                        self.kademlia.write().remove_peer(&announce_peer.id, now);
320                    }
321                }
322            }
323
324            // Do we need to persist our peer buckets?
325            {
326                let try_result = try {
327                    let kademlia = self.kademlia.read();
328                    if last_peers_dump_to_storage < kademlia.peers_last_changed() && now - last_peers_dump_to_storage > config::MILLIS_TO_WAIT_BETWEEN_PEER_DUMPS {
329                        last_peers_dump_to_storage = kademlia.peers_last_changed();
330                        let peer_buckets = kademlia.get_peer_buckets();
331                        //let total_peers: usize = peer_buckets.iter().map(|peers| peers.len()).sum();
332                        // trace!("persisting peer_buckets of length={}", total_peers);
333                        self.environment.config_put_struct(CONFIG_KADEMLIA_PEER_BUCKETS, &peer_buckets)?;
334                    }
335                };
336
337                if let Err(e) = try_result {
338                    warn!("problem persisting peer_buckets: {}", e);
339                }
340            }
341
342            tools::cancellable_sleep_millis(self.runtime_services.time_provider.as_ref(), MILLIS_IN_SECOND.const_mul(30), &cancellation_token).await;
343        }
344
345        Ok(())
346    }
347}
348
349impl kademlia::Peer<Id> for Peer {
350    fn id(&self) -> &Id {
351        &self.id
352    }
353    fn score(&self, time_millis: TimeMillis) -> f64 {
354        // This score makes sure that peers are currently active, but also benefits peers who have been active for a while
355        self.pow_current_day.pow_decayed_day(time_millis) + self.pow_current_month.pow_decayed_month(time_millis)
356    }
357}