Skip to main content

hashiverse_lib/protocol/rpc/
rpc_request.rs

1//! # RPC request packet encode / decode
2//!
3//! The wire format of every outgoing and incoming RPC request. Split into a strict
4//! Tx / Rx pair so the compiler enforces the asymmetry between "I'm building a request"
5//! and "I'm parsing one":
6//!
7//! - [`RpcRequestPacketTx`] — build a request. `encode` optionally compresses the
8//!   payload, runs the PoW search (via a
9//!   [`crate::tools::parallel_pow_generator::ParallelPowGenerator`]) targeting the
10//!   destination server's identity, and produces a header containing the discriminator,
11//!   sponsor id, PoW timestamp, content hash, salt, and payload length.
12//! - [`RpcRequestPacketRx`] — parse a request on the receive side. Extracts the header,
13//!   verifies PoW against the receiving server's own identity, rejects under-powered
14//!   or stale requests (via [`crate::tools::config::POW_MAX_CLOCK_DRIFT_MILLIS`]) before
15//!   any payload work happens.
16//!
17//! The PoW sits in the *header* and binds to the server's identity, so a request valid
18//! for server A can't be forwarded to server B and still authenticate.
19
20use crate::protocol::payload::payload::PayloadRequestKind;
21use crate::tools::parallel_pow_generator::ParallelPowGenerator;
22use crate::tools::server_id::ServerId;
23use crate::tools::time::{TimeMillis, TimeMillisBytes, TIME_MILLIS_BYTES};
24use crate::tools::time_provider::time_provider::TimeProvider;
25use crate::tools::types::{Hash, Id, PQCommitmentBytes, Pow, Salt, VerificationKeyBytes, HASH_BYTES, ID_BYTES, SALT_BYTES};
26use crate::tools::{compression, config, hashing};
27use bitflags::bitflags;
28use bytes::{Buf, BufMut, Bytes, BytesMut};
29
30bitflags! {
31    pub struct RpcRequestPacketTxFlags: u8 {
32        const COMPRESSED = 1 << 0;
33        const SERVER_KNOWN = 1 << 1;
34    }
35}
36
37/// An outbound RPC request, freshly encoded and ready to be sent over a [`crate::transport::TransportFactory`].
38///
39/// The "Tx" (transmit) side owns the encode path. Constructing one is expensive: in addition
40/// to compressing the payload and prefixing the header, `encode` performs a proof-of-work
41/// search against the destination server's identity via [`ParallelPowGenerator`]. The `pow`
42/// sits in the wire header so the server can reject under-powered requests cheaply before
43/// doing any payload work. `pow_content_hash` is retained on the struct so the caller can
44/// later verify that the response was produced for *this* specific request (see
45/// [`RpcResponsePacketRx`]).
46///
47/// This type and its "Rx" counterpart [`RpcRequestPacketRx`] are intentionally separate so
48/// the encode side and the decode side cannot be accidentally mixed up.
49pub struct RpcRequestPacketTx {
50    pub pow_content_hash: Hash,
51    pub bytes: Bytes,
52}
53impl RpcRequestPacketTx {
54    pub async fn encode(
55        time_provider: &dyn TimeProvider,
56        pow_minimum_per_rpc: Pow,
57        flags: RpcRequestPacketTxFlags,
58        payload_request_kind: PayloadRequestKind,
59        pow_sponsor_id: &Id,
60        destination_verification_key_bytes: &VerificationKeyBytes,
61        destination_pq_commitment_bytes: &PQCommitmentBytes,
62        payload_uncompressed: Bytes,
63        pow_generator: &dyn ParallelPowGenerator,
64    ) -> anyhow::Result<Self> {
65        // Do we actually need to compress this?
66        let payload_compressed = match flags.contains(RpcRequestPacketTxFlags::COMPRESSED) {
67            true => compression::compress_for_speed(payload_uncompressed.as_ref())?.to_bytes(),
68            false => payload_uncompressed,
69        };
70
71        // Check that it is not too large...
72        if payload_compressed.len() > config::PROTOCOL_MAX_BLOB_SIZE_REQUEST {
73            anyhow::bail!("request payload size exceeds maximum allowed size: {} > {}", payload_compressed.len(), config::PROTOCOL_MAX_BLOB_SIZE_REQUEST);
74        }
75
76        // This will be needed when we verify that the server processed our request
77        let pow_content_hash: Hash = hashing::hash(payload_compressed.as_ref());
78
79        // Do some proof of work on behalf of the destination server, sponsored by the caller
80        let (pow_timestamp, pow_salt, _, _) = ServerId::pow_generate(time_provider, pow_minimum_per_rpc, pow_sponsor_id, destination_verification_key_bytes, destination_pq_commitment_bytes, &pow_content_hash, pow_generator).await?;
81
82        // Write the header
83        let mut bytes_mut = BytesMut::with_capacity(size_of::<u8>() + size_of::<u8>() + size_of::<u16>() + ID_BYTES + TIME_MILLIS_BYTES + HASH_BYTES + SALT_BYTES + size_of::<u32>() + payload_compressed.len());
84        bytes_mut.put_u8(1);
85        bytes_mut.put_u8(flags.bits());
86        bytes_mut.put_u16_le(payload_request_kind as u16);
87        bytes_mut.put_slice(pow_sponsor_id.as_ref());
88        bytes_mut.put_slice(pow_timestamp.encode_be().as_ref());
89        bytes_mut.put_slice(pow_content_hash.as_ref());
90        bytes_mut.put_slice(pow_salt.as_ref());
91        bytes_mut.put_u32_le(payload_compressed.len() as u32);
92        bytes_mut.put_slice(&payload_compressed);
93
94        let bytes = bytes_mut.freeze();
95
96        Ok(Self { pow_content_hash, bytes })
97    }
98}
99
100/// The server-side view of an inbound RPC request after header parsing and PoW verification.
101///
102/// `RpcRequestPacketRx` is what falls out of `decode` on the receiving end. It exposes the
103/// routing discriminator ([`PayloadRequestKind`]) so the dispatcher knows which handler to
104/// invoke, plus all the PoW fields needed to decide how much trust to assign to the call.
105/// The `bytes` field still contains the compressed payload — payload decoding happens later,
106/// after the dispatcher has picked the right handler.
107///
108/// Paired with [`RpcRequestPacketTx`] as the decode side of the same wire format.
109#[derive(Debug)]
110pub struct RpcRequestPacketRx {
111    pub payload_request_kind: PayloadRequestKind,
112    pub pow_sponsor_id: Id,
113    pub pow_server_known: bool,
114    pub pow: Pow,
115    pub pow_timestamp: TimeMillis,
116    pub pow_content_hash: Hash,
117    pub pow_salt: Salt,
118    pub bytes: Bytes,
119}
120impl RpcRequestPacketRx {
121    pub fn decode(current_time_millis: &TimeMillis, verification_key_bytes: &VerificationKeyBytes, pq_commitment_bytes: &PQCommitmentBytes, mut response_bytes: Bytes) -> anyhow::Result<Self> {
122        // Do we have enough bytes for the header?
123        if response_bytes.len() < size_of::<u8>() + size_of::<u8>() + size_of::<u16>() + ID_BYTES + TIME_MILLIS_BYTES + HASH_BYTES + SALT_BYTES + size_of::<u32>() {
124            anyhow::bail!("RpcRequestPacket is too short for header");
125        }
126
127        let version = response_bytes.get_u8();
128        if 1 != version {
129            anyhow::bail!("Unsupported RpcRequestPacket version: {}", version);
130        }
131
132        let flags = RpcRequestPacketTxFlags::from_bits(response_bytes.get_u8()).ok_or_else(|| anyhow::anyhow!("Invalid RpcRequestPacket flags"))?;
133        let payload_request_kind = PayloadRequestKind::from_u16(response_bytes.get_u16_le())?;
134        let pow_sponsor_id = Id::from_slice(response_bytes.slice(..ID_BYTES).as_ref())?;
135        response_bytes.advance(ID_BYTES);
136        let pow_timestamp_bytes: TimeMillisBytes = TimeMillisBytes::from_bytes(response_bytes.slice(..TIME_MILLIS_BYTES).as_ref())?;
137        response_bytes.advance(TIME_MILLIS_BYTES);
138        let pow_content_hash: Hash = Hash::from_slice(response_bytes.slice(..HASH_BYTES).as_ref())?;
139        response_bytes.advance(HASH_BYTES);
140        let pow_salt = Salt::from_slice(response_bytes.slice(..SALT_BYTES).as_ref())?;
141        response_bytes.advance(SALT_BYTES);
142
143        // Is the packet timestamp close enough to ours?
144        let pow_timestamp = TimeMillis::timestamp_decode_be(&pow_timestamp_bytes);
145        {
146            let delta_millis = (*current_time_millis - pow_timestamp).abs();
147            if delta_millis.0 > config::POW_MAX_CLOCK_DRIFT_MILLIS.0 {
148                anyhow::bail!("Client pow clock drift too large: us={} them={}", current_time_millis, pow_timestamp);
149            }
150        }
151
152        // Has the client done enough pow for us?
153        let (pow, pow_server_known) = match flags.contains(RpcRequestPacketTxFlags::SERVER_KNOWN) {
154            true => {
155                let (pow, _) = ServerId::pow_measure(&pow_sponsor_id, verification_key_bytes, pq_commitment_bytes, &pow_timestamp_bytes, &pow_content_hash, &pow_salt)?;
156                if pow < config::POW_MINIMUM_PER_RPC_SERVER_KNOWN {
157                    anyhow::bail!("Client has not done enough known pow: {} < {}", pow, config::POW_MINIMUM_PER_RPC_SERVER_KNOWN);
158                }
159                (pow, true)
160            }
161            false => {
162                let (pow, _) = ServerId::pow_measure(&pow_sponsor_id, &VerificationKeyBytes::zero(), &PQCommitmentBytes::zero(), &pow_timestamp_bytes, &pow_content_hash, &pow_salt)?;
163                if pow < config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN {
164                    anyhow::bail!("Client has not done enough unknown pow: {} < {}", pow, config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN);
165                }
166                (pow, false)
167            }
168        };
169
170        let request_payload_len = response_bytes.get_u32_le() as usize;
171
172        if request_payload_len > config::PROTOCOL_MAX_BLOB_SIZE_REQUEST {
173            anyhow::bail!("RpcRequestPacket payload too large: {} > {}", request_payload_len, config::PROTOCOL_MAX_BLOB_SIZE_REQUEST);
174        }
175
176        // Do we have enough bytes for the payload?
177        if response_bytes.len() < request_payload_len {
178            anyhow::bail!("RpcRequestPacket is too short for payload");
179        }
180
181        let response_payload = response_bytes.slice(..request_payload_len);
182        response_bytes.advance(request_payload_len);
183
184        // Sanity check - are we done?
185        if !response_bytes.is_empty() {
186            anyhow::bail!("RpcRequestPacket is too long");
187        }
188
189        // Do we need to decompress?
190        let response_payload_decompressed = match flags.contains(RpcRequestPacketTxFlags::COMPRESSED) {
191            true => compression::decompress(response_payload.as_ref())?.to_bytes(),
192            false => response_payload,
193        };
194
195        let selfie = Self {
196            payload_request_kind,
197            pow_sponsor_id,
198            pow_server_known,
199            pow,
200            pow_timestamp,
201            pow_content_hash,
202            pow_salt,
203            bytes: response_payload_decompressed,
204        };
205        // trace!("Decoded RpcRequestPacketRx={:?}", selfie);
206
207        Ok(selfie)
208    }
209}
210