hashiverse_lib/transport/transport.rs
1//! # Transport abstraction — the seam between protocol and network
2//!
3//! Three traits define everything the protocol code needs from a transport, with
4//! concrete implementations (HTTPS in production, in-memory for tests) supplied by
5//! sibling modules:
6//!
7//! - [`TransportFactory`] — builds a [`TransportServer`] and issues outbound client
8//! requests. Injected via [`crate::tools::runtime_services::RuntimeServices`] so the
9//! same protocol code runs over HTTPS on native, over HTTP via `gloo-net` in WASM,
10//! and over a deterministic in-memory queue in tests.
11//! - [`TransportServer`] — binds an address, delivers inbound requests over an
12//! `mpsc::Receiver<IncomingRequest>`, and shuts down on a `CancellationToken`.
13//! - [`TransportServerHandler`] — the application-level processor of incoming requests,
14//! implemented by the server binary.
15//!
16//! [`IncomingRequest`] wraps one inbound request: the raw bytes, the caller's address,
17//! a oneshot response channel, and a
18//! [`crate::transport::ddos::ddos::DdosConnectionGuard`] whose drop releases the
19//! caller's per-IP connection slot — so request-level DDoS accounting is automatic and
20//! can't be leaked.
21
22use crate::tools::BytesGatherer;
23use crate::transport::ddos::ddos::DdosConnectionGuard;
24use bytes::Bytes;
25use log::{info, trace, warn};
26use std::sync::Arc;
27use tokio::sync::{mpsc, oneshot};
28use tokio_util::sync::CancellationToken;
29
30/// A single request delivered from a [`TransportServer`] to the
31/// application-level [`TransportServerHandler`] for processing.
32///
33/// The transport layer owns inbound sockets, parses wire-level framing, and forwards the
34/// decoded payload to the handler as an `IncomingRequest`. It carries the raw request bytes
35/// plus the metadata the handler may need to reason about: the caller's address (for logging
36/// and DDoS accounting), a oneshot channel on which to send the response, and an
37/// [`DdosConnectionGuard`] that ties the request's lifetime to its underlying connection slot
38/// — dropping the guard releases the per-IP connection count.
39///
40/// Handlers do not construct `IncomingRequest` directly; they `.await` them off the
41/// `mpsc::Receiver` passed to [`TransportServerHandler::run`].
42pub struct IncomingRequest {
43 pub caller_address: String,
44 pub bytes: Bytes,
45 pub reply: oneshot::Sender<BytesGatherer>,
46 ddos_connection_guard: Arc<DdosConnectionGuard>,
47}
48
49impl IncomingRequest {
50 pub fn new(caller_address: String, bytes: Bytes, reply: oneshot::Sender<BytesGatherer>, ddos_connection_guard: Arc<DdosConnectionGuard>) -> Self {
51 Self { caller_address, bytes, reply, ddos_connection_guard }
52 }
53
54 pub fn report_bad_request(&self) {
55 self.ddos_connection_guard.report_bad_request();
56 }
57}
58
59/// The lifecycle state of a [`TransportServer`].
60///
61/// A server starts in `Created`, transitions to `Listening` when [`TransportServer::listen`]
62/// is called for the first time, and moves to `Shutdown` when its cancellation token fires.
63/// Implementations use this to reject double-start (`Listening` → `Listening`) and
64/// use-after-shutdown (`Shutdown` → anything) errors cleanly rather than silently racing.
65#[derive(Debug, Clone, Copy, PartialEq)]
66pub enum ServerState {
67 Created,
68 Listening,
69 Shutdown,
70}
71
72/// The application-level request handler that sits behind a [`TransportServer`].
73///
74/// Implementors receive decoded request bytes and produce response bytes without caring
75/// about sockets, framing, or connection lifecycle — all of that stays in the transport
76/// layer. The default [`TransportServerHandler::run`] implementation is the canonical event
77/// loop: it reads [`IncomingRequest`]s off a channel, dispatches each one to
78/// [`TransportServerHandler::handle`], and ships the response back to the originating client.
79/// Shutdown is cooperative via a [`CancellationToken`].
80///
81/// The server binary's RPC dispatcher and the client's own loopback handler both implement
82/// this trait, which lets them share the same event-loop plumbing.
83pub trait TransportServerHandler {
84 async fn handle(&self, bytes: Bytes) -> BytesGatherer;
85
86 async fn run(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> anyhow::Result<()> {
87 loop {
88 tokio::select! {
89 _ = cancellation_token.cancelled() => {
90 break;
91 },
92
93 receipt = rx.recv() => {
94 match receipt {
95 Some(incoming) => {
96 info!("received packet from {}: {:?}", incoming.caller_address, incoming.bytes);
97 let result = self.handle(incoming.bytes.clone()).await;
98 let result = incoming.reply.send(result);
99 match result {
100 Ok(_) => { trace!("sent reply"); },
101 Err(_) => { warn!("failed to send reply"); },
102 }
103 },
104 None => {
105 warn!("channel closed");
106 break;
107 }
108 }
109 },
110 }
111 }
112
113 Ok(())
114 }
115}
116
117/// A server-side endpoint that accepts inbound connections and forwards each request to a
118/// handler via an mpsc channel.
119///
120/// `TransportServer` abstracts the concrete listening strategy — `MemTransportServer` for
121/// in-memory tests, the HTTPS implementation in the server crate for production, a browser
122/// stub in the wasm client that panics on `listen`. All of them expose the same two-operation
123/// surface: report where they can be reached (`get_address`) and run the accept loop until
124/// the supplied `CancellationToken` fires (`listen`).
125///
126/// A `TransportServer` instance is single-shot — once `listen` has completed (cancelled or
127/// errored) the server transitions to [`ServerState::Shutdown`] and must not be re-used.
128#[async_trait::async_trait]
129pub trait TransportServer: Send + Sync {
130 fn get_address(&self) -> &String;
131
132 async fn listen(&self, cancellation_token: CancellationToken, handler: mpsc::Sender<IncomingRequest>) -> anyhow::Result<()>;
133}
134
135/// The pluggable network layer of the protocol — the single point where the crate touches
136/// "how do we move bytes around the world".
137///
138/// A `TransportFactory` knows three things: (1) where to find the network's bootstrap peers
139/// for initial peer discovery, (2) how to create a [`TransportServer`] that listens on a given
140/// port, and (3) how to perform an outbound unary RPC against a peer address. Everything
141/// above this layer — the RPC packet framing, PoW, peer tracking, Kademlia — is network-
142/// agnostic and simply calls [`TransportFactory::rpc`].
143///
144/// Concrete implementations include the in-memory `MemTransportFactory` used by integration
145/// tests, `FullHttpsTransportFactory` in the server crate for production TLS+HTTPS, and
146/// `WasmTransportFactory` in the browser client that speaks HTTP via `gloo-net`. Swapping
147/// the factory on [`crate::tools::runtime_services::RuntimeServices`] changes the wire protocol
148/// without any other code having to care.
149#[async_trait::async_trait]
150pub trait TransportFactory: Send + Sync {
151 async fn get_bootstrap_addresses(&self) -> Vec<String>;
152 async fn create_server(&self, base_path: &str, port: u16, force_local_network: bool) -> anyhow::Result<Arc<dyn TransportServer>>;
153 async fn rpc(&self, address: &str, bytes: Bytes) -> anyhow::Result<Bytes>;
154}
155
156#[cfg(any(test, feature = "generic-tests"))]
157pub mod tests {
158 use crate::tools::time::{MILLIS_IN_MILLISECOND, MILLIS_IN_SECOND};
159 use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
160 use crate::tools::tools::get_temp_dir;
161 use crate::tools::BytesGatherer;
162 use crate::transport::transport::{IncomingRequest, TransportFactory, TransportServerHandler};
163 use bytes::Bytes;
164 use log::{info, trace};
165 use std::sync::Arc;
166 use tokio::join;
167 use tokio::sync::mpsc;
168 use tokio_util::sync::CancellationToken;
169
170 pub async fn rpc_test(transport_factory: Arc<dyn TransportFactory>) -> anyhow::Result<()> {
171 let time_provider = Arc::new(RealTimeProvider::default());
172 // configure_logging_with_time_provider("trace", time_provider.clone());
173
174 let cancellation_token = CancellationToken::new();
175 let (_, temp_dir_str) = get_temp_dir()?;
176 let transport_server = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
177 let address = transport_server.get_address().clone();
178 trace!("server address is {}", address);
179
180 let (tx, rx) = mpsc::channel::<IncomingRequest>(32);
181
182 struct MyHandler {}
183 impl TransportServerHandler for MyHandler {
184 async fn handle(&self, _: Bytes) -> BytesGatherer {
185 BytesGatherer::from_bytes(Bytes::from("here is the reply"))
186 }
187 }
188
189 let my_handler = MyHandler {};
190
191 info!("running server and clients in parallel");
192 let results = join!(
193 my_handler.run(cancellation_token.clone(), rx),
194 transport_server.listen(cancellation_token.clone(), tx),
195 // The driver process
196 async {
197 info!("waiting for server to start");
198 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
199
200 for _ in 0..20 {
201 info!("calling server");
202 let bytes = Bytes::from("hello");
203 let response = transport_factory.rpc(&address, bytes).await.unwrap();
204 assert_eq!(response, Bytes::from("here is the reply"));
205 time_provider.sleep_millis(MILLIS_IN_MILLISECOND.const_mul(100)).await;
206 }
207
208 info!("shutting down servers");
209 cancellation_token.cancel();
210 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
211
212 Ok::<(), anyhow::Error>(())
213 }
214 );
215
216 // Check all the processes exited happily
217 assert!(results.0.is_ok());
218 assert!(results.1.is_ok());
219 assert!(results.2.is_ok());
220
221 Ok(())
222 }
223
224 pub async fn bind_port_zero_test(transport_factory: Arc<dyn TransportFactory>) -> anyhow::Result<()> {
225 let time_provider = Arc::new(RealTimeProvider::default());
226 // configure_logging_with_time_provider("trace", time_provider.clone());
227
228 info!("starting test");
229
230 let cancellation_token = CancellationToken::new();
231 let (_, temp_dir_str) = get_temp_dir()?;
232 let transport_server_1 = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
233 let transport_server_2 = transport_factory.create_server(&temp_dir_str, 0u16, true).await?;
234
235 let (tx_1, rx_1) = mpsc::channel::<IncomingRequest>(32);
236 let (tx_2, rx_2) = mpsc::channel::<IncomingRequest>(32);
237
238 struct MyHandler {}
239 impl TransportServerHandler for MyHandler {
240 async fn handle(&self, _: Bytes) -> BytesGatherer {
241 BytesGatherer::from_bytes(Bytes::from("here is the reply"))
242 }
243 }
244
245 let my_handler = MyHandler {};
246
247 info!("running server and clients in parallel");
248 let results = join!(
249 my_handler.run(cancellation_token.clone(), rx_1),
250 my_handler.run(cancellation_token.clone(), rx_2),
251 transport_server_1.listen(cancellation_token.clone(), tx_1),
252 transport_server_2.listen(cancellation_token.clone(), tx_2),
253 // The driver process
254 async {
255 info!("waiting for server to start");
256 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
257
258 info!("shutting down servers");
259 cancellation_token.cancel();
260 time_provider.sleep_millis(MILLIS_IN_SECOND.const_mul(1)).await;
261
262 Ok::<(), anyhow::Error>(())
263 }
264 );
265
266 // Check all the processes exited happily
267 assert!(results.0.is_ok());
268 assert!(results.1.is_ok());
269 assert!(results.2.is_ok());
270 assert!(results.3.is_ok());
271 assert!(results.4.is_ok());
272
273 Ok(())
274 }
275}