Skip to main content

hashiverse_lib/transport/
transport.rs

1//! # Transport abstraction — the seam between protocol and network
2//!
3//! Three traits define everything the protocol code needs from a transport, with
4//! concrete implementations (HTTPS in production, in-memory for tests) supplied by
5//! sibling modules:
6//!
7//! - [`TransportFactory`] — builds a [`TransportServer`] and issues outbound client
8//!   requests. Injected via [`crate::tools::runtime_services::RuntimeServices`] so the
9//!   same protocol code runs over HTTPS on native, over HTTP via `gloo-net` in WASM,
10//!   and over a deterministic in-memory queue in tests.
11//! - [`TransportServer`] — binds an address, delivers inbound requests over an
12//!   `mpsc::Receiver<IncomingRequest>`, and shuts down on a `CancellationToken`.
13//! - [`TransportServerHandler`] — the application-level processor of incoming requests,
14//!   implemented by the server binary.
15//!
16//! [`IncomingRequest`] wraps one inbound request: the raw bytes, the caller's address,
17//! a oneshot response channel, and a
18//! [`crate::transport::ddos::ddos::DdosConnectionGuard`] whose drop releases the
19//! caller's per-IP connection slot — so request-level DDoS accounting is automatic and
20//! can't be leaked.
21
22use crate::tools::BytesGatherer;
23use crate::transport::ddos::ddos::DdosConnectionGuard;
24use bytes::Bytes;
25use log::{info, trace, warn};
26use std::sync::Arc;
27use tokio::sync::{mpsc, oneshot};
28use tokio_util::sync::CancellationToken;
29
30/// A single request delivered from a [`TransportServer`] to the
31/// application-level [`TransportServerHandler`] for processing.
32///
33/// The transport layer owns inbound sockets, parses wire-level framing, and forwards the
34/// decoded payload to the handler as an `IncomingRequest`. It carries the raw request bytes
35/// plus the metadata the handler may need to reason about: the caller's address (for logging
36/// and DDoS accounting), a oneshot channel on which to send the response, and an
37/// [`DdosConnectionGuard`] that ties the request's lifetime to its underlying connection slot
38/// — dropping the guard releases the per-IP connection count.
39///
40/// Handlers do not construct `IncomingRequest` directly; they `.await` them off the
41/// `mpsc::Receiver` passed to [`TransportServerHandler::run`].
42pub struct IncomingRequest {
43    pub caller_address: String,
44    pub bytes: Bytes,
45    pub reply: oneshot::Sender<BytesGatherer>,
46    ddos_connection_guard: Arc<DdosConnectionGuard>,
47}
48
49impl IncomingRequest {
50    pub fn new(caller_address: String, bytes: Bytes, reply: oneshot::Sender<BytesGatherer>, ddos_connection_guard: Arc<DdosConnectionGuard>) -> Self {
51        Self { caller_address, bytes, reply, ddos_connection_guard }
52    }
53
54    pub fn report_bad_request(&self) {
55        self.ddos_connection_guard.report_bad_request();
56    }
57}
58
59/// The lifecycle state of a [`TransportServer`].
60///
61/// A server starts in `Created`, transitions to `Listening` when [`TransportServer::listen`]
62/// is called for the first time, and moves to `Shutdown` when its cancellation token fires.
63/// Implementations use this to reject double-start (`Listening` → `Listening`) and
64/// use-after-shutdown (`Shutdown` → anything) errors cleanly rather than silently racing.
65#[derive(Debug, Clone, Copy, PartialEq)]
66pub enum ServerState {
67    Created,
68    Listening,
69    Shutdown,
70}
71
72/// The application-level request handler that sits behind a [`TransportServer`].
73///
74/// Implementors receive decoded request bytes and produce response bytes without caring
75/// about sockets, framing, or connection lifecycle — all of that stays in the transport
76/// layer. The default [`TransportServerHandler::run`] implementation is the canonical event
77/// loop: it reads [`IncomingRequest`]s off a channel, dispatches each one to
78/// [`TransportServerHandler::handle`], and ships the response back to the originating client.
79/// Shutdown is cooperative via a [`CancellationToken`].
80///
81/// The server binary's RPC dispatcher and the client's own loopback handler both implement
82/// this trait, which lets them share the same event-loop plumbing.
83pub trait TransportServerHandler {
84    async fn handle(&self, bytes: Bytes) -> BytesGatherer;
85
86    async fn run(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> anyhow::Result<()> {
87        loop {
88            tokio::select! {
89                _ = cancellation_token.cancelled() => {
90                    break;
91                },
92
93                receipt = rx.recv() => {
94                    match receipt {
95                            Some(incoming) => {
96                                info!("received packet from {}: {:?}", incoming.caller_address, incoming.bytes);
97                                let result = self.handle(incoming.bytes.clone()).await;
98                                let result = incoming.reply.send(result);
99                                match result {
100                                    Ok(_) => { trace!("sent reply"); },
101                                    Err(_) => { warn!("failed to send reply"); },
102                                }
103                            },
104                            None => {
105                                warn!("channel closed");
106                                break;
107                            }
108                        }
109                },
110            }
111        }
112
113        Ok(())
114    }
115}
116
117/// A server-side endpoint that accepts inbound connections and forwards each request to a
118/// handler via an mpsc channel.
119///
120/// `TransportServer` abstracts the concrete listening strategy — `MemTransportServer` for
121/// in-memory tests, the HTTPS implementation in the server crate for production, a browser
122/// stub in the wasm client that panics on `listen`. All of them expose the same two-operation
123/// surface: report where they can be reached (`get_address`) and run the accept loop until
124/// the supplied `CancellationToken` fires (`listen`).
125///
126/// A `TransportServer` instance is single-shot — once `listen` has completed (cancelled or
127/// errored) the server transitions to [`ServerState::Shutdown`] and must not be re-used.
128#[async_trait::async_trait]
129pub trait TransportServer: Send + Sync {
130    fn get_address(&self) -> &String;
131
132    async fn listen(&self, cancellation_token: CancellationToken, handler: mpsc::Sender<IncomingRequest>) -> anyhow::Result<()>;
133}
134
135/// The pluggable network layer of the protocol — the single point where the crate touches
136/// "how do we move bytes around the world".
137///
138/// A `TransportFactory` knows three things: (1) where to find the network's bootstrap peers
139/// for initial peer discovery, (2) how to create a [`TransportServer`] that listens on a given
140/// port, and (3) how to perform an outbound unary RPC against a peer address. Everything
141/// above this layer — the RPC packet framing, PoW, peer tracking, Kademlia — is network-
142/// agnostic and simply calls [`TransportFactory::rpc`].
143///
144/// Concrete implementations include the in-memory `MemTransportFactory` used by integration
145/// tests, `FullHttpsTransportFactory` in the server crate for production TLS+HTTPS, and
146/// `WasmTransportFactory` in the browser client that speaks HTTP via `gloo-net`. Swapping
147/// the factory on [`crate::tools::runtime_services::RuntimeServices`] changes the wire protocol
148/// without any other code having to care.
149#[async_trait::async_trait]
150pub trait TransportFactory: Send + Sync {
151    async fn get_bootstrap_addresses(&self) -> Vec<String>;
152    async fn create_server(&self, base_path: &str, port: u16, force_local_network: bool) -> anyhow::Result<Arc<dyn TransportServer>>;
153    async fn rpc(&self, address: &str, bytes: Bytes) -> anyhow::Result<Bytes>;
154}
155
156#[cfg(any(test, feature = "generic-tests"))]
157pub mod tests {
158    use crate::tools::time::{MILLIS_IN_MILLISECOND, MILLIS_IN_SECOND};
159    use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
160    use crate::tools::tools::get_temp_dir;
161    use crate::tools::BytesGatherer;
162    use crate::transport::transport::{IncomingRequest, TransportFactory, TransportServerHandler};
163    use bytes::Bytes;
164    use log::{info, trace};
165    use std::sync::Arc;
166    use tokio::join;
167    use tokio::sync::mpsc;
168    use tokio_util::sync::CancellationToken;
169
170    pub async fn rpc_test(transport_factory: Arc<dyn TransportFactory>) -> anyhow::Result<()> {
171        let time_provider = Arc::new(RealTimeProvider::default());
172        // configure_logging_with_time_provider("trace", time_provider.clone());
173
174        let cancellation_token = CancellationToken::new();
175        let (_, temp_dir_str) = get_temp_dir()?;
176        let transport_server = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
177        let address = transport_server.get_address().clone();
178        trace!("server address is {}", address);
179
180        let (tx, rx) = mpsc::channel::<IncomingRequest>(32);
181
182        struct MyHandler {}
183        impl TransportServerHandler for MyHandler {
184            async fn handle(&self, _: Bytes) -> BytesGatherer {
185                BytesGatherer::from_bytes(Bytes::from("here is the reply"))
186            }
187        }
188
189        let my_handler = MyHandler {};
190
191        info!("running server and clients in parallel");
192        let results = join!(
193            my_handler.run(cancellation_token.clone(), rx),
194            transport_server.listen(cancellation_token.clone(), tx),
195            // The driver process
196            async {
197                info!("waiting for server to start");
198                time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
199
200                for _ in 0..20 {
201                    info!("calling server");
202                    let bytes = Bytes::from("hello");
203                    let response = transport_factory.rpc(&address, bytes).await.unwrap();
204                    assert_eq!(response, Bytes::from("here is the reply"));
205                    time_provider.sleep_millis(MILLIS_IN_MILLISECOND.const_mul(100)).await;
206                }
207
208                info!("shutting down servers");
209                cancellation_token.cancel();
210                time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
211
212                Ok::<(), anyhow::Error>(())
213            }
214        );
215
216        // Check all the processes exited happily
217        assert!(results.0.is_ok());
218        assert!(results.1.is_ok());
219        assert!(results.2.is_ok());
220
221        Ok(())
222    }
223
224    pub async fn bind_port_zero_test(transport_factory: Arc<dyn TransportFactory>) -> anyhow::Result<()> {
225        let time_provider = Arc::new(RealTimeProvider::default());
226        // configure_logging_with_time_provider("trace", time_provider.clone());
227
228        info!("starting test");
229
230        let cancellation_token = CancellationToken::new();
231        let (_, temp_dir_str) = get_temp_dir()?;
232        let transport_server_1 = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
233        let transport_server_2 = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
234
235        let (tx_1, rx_1) = mpsc::channel::<IncomingRequest>(32);
236        let (tx_2, rx_2) = mpsc::channel::<IncomingRequest>(32);
237
238        struct MyHandler {}
239        impl TransportServerHandler for MyHandler {
240            async fn handle(&self, _: Bytes) -> BytesGatherer {
241                BytesGatherer::from_bytes(Bytes::from("here is the reply"))
242            }
243        }
244
245        let my_handler = MyHandler {};
246
247        info!("running server and clients in parallel");
248        let results = join!(
249            my_handler.run(cancellation_token.clone(), rx_1),
250            my_handler.run(cancellation_token.clone(), rx_2),
251            transport_server_1.listen(cancellation_token.clone(), tx_1),
252            transport_server_2.listen(cancellation_token.clone(), tx_2),
253            // The driver process
254            async {
255                info!("waiting for server to start");
256                time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
257
258                info!("shutting down servers");
259                cancellation_token.cancel();
260                time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
261
262                Ok::<(), anyhow::Error>(())
263            }
264        );
265
266        // Check all the processes exited happily
267        assert!(results.0.is_ok());
268        assert!(results.1.is_ok());
269        assert!(results.2.is_ok());
270        assert!(results.3.is_ok());
271        assert!(results.4.is_ok());
272
273        Ok(())
274    }
275}