Skip to main content

hashiverse_lib/protocol/rpc/
rpc.rs

1//! # High-level RPC client helpers
2//!
3//! The user-facing side of the RPC layer. Where
4//! [`crate::protocol::rpc::rpc_request`] and
5//! [`crate::protocol::rpc::rpc_response`] deal with packet bytes, this module exposes
6//! end-to-end helpers that:
7//!
8//! 1. **Encode** the request with the right PoW difficulty (higher when the peer is
9//!    unknown, lower when the peer is already in the tracker — see
10//!    [`rpc_server_known_with_requisite_pow`]).
11//! 2. **Dispatch** over the supplied transport.
12//! 3. **Decode** the response, verifying the signature over the request's
13//!    `pow_content_hash` — so a reply can only have come from the specific server we
14//!    addressed, not a replay from another peer.
15//! 4. **Return** typed payloads or an `ErrorResponseV1`.
16//!
17//! Call sites either use [`rpc_server_unknown`] (first contact / bootstrap) or
18//! [`rpc_server_known`] (everything else). Trust-sensitive handlers can opt into a
19//! higher PoW floor via the `_with_requisite_pow` variant.
20
21use bytes::Bytes;
22use crate::protocol::payload::payload::{ErrorResponseV1, PayloadRequestKind, PayloadResponseKind};
23use crate::protocol::peer::Peer;
24use crate::protocol::rpc::rpc_request::{RpcRequestPacketTx, RpcRequestPacketTxFlags};
25use crate::protocol::rpc::rpc_response::RpcResponsePacketRx;
26use crate::tools::runtime_services::RuntimeServices;
27use crate::tools::types::{Id, PQCommitmentBytes, Pow, VerificationKeyBytes};
28use crate::tools::{config, json};
29
30pub async fn rpc_server_unknown(
31    runtime_services: &RuntimeServices,
32    sponsor_id: &Id,
33    destination_address: &String,
34    payload_request_kind: PayloadRequestKind,
35    payload: Bytes,
36) -> anyhow::Result<RpcResponsePacketRx> {
37    let destination_id = Id::zero();
38    let destination_verification_key = VerificationKeyBytes::zero();
39    let destination_pq_commitment_bytes = PQCommitmentBytes::zero();
40    let flags = RpcRequestPacketTxFlags::COMPRESSED;
41    rpc_server_xxx(
42        runtime_services,
43        config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN,
44        flags,
45        sponsor_id,
46        destination_address,
47        &destination_id,
48        &destination_verification_key,
49        &destination_pq_commitment_bytes,
50        payload_request_kind,
51        payload,
52    )
53    .await
54}
55
56pub async fn rpc_server_known(
57    runtime_services: &RuntimeServices,
58    sponsor_id: &Id,
59    destination_peer: &Peer,
60    payload_request_kind: PayloadRequestKind,
61    payload: Bytes,
62) -> anyhow::Result<RpcResponsePacketRx> {
63    let flags = RpcRequestPacketTxFlags::COMPRESSED | RpcRequestPacketTxFlags::SERVER_KNOWN;
64    rpc_server_xxx(
65        runtime_services,
66        config::POW_MINIMUM_PER_RPC_SERVER_KNOWN,
67        flags,
68        sponsor_id,
69        &destination_peer.address,
70        &destination_peer.id,
71        &destination_peer.verification_key_bytes,
72        &destination_peer.pq_commitment_bytes,
73        payload_request_kind,
74        payload,
75    )
76    .await
77}
78
79pub async fn rpc_server_known_with_no_compression(
80    runtime_services: &RuntimeServices,
81    sponsor_id: &Id,
82    destination_peer: &Peer,
83    payload_request_kind: PayloadRequestKind,
84    payload: Bytes,
85) -> anyhow::Result<RpcResponsePacketRx> {
86    let flags = RpcRequestPacketTxFlags::SERVER_KNOWN;
87    rpc_server_xxx(
88        runtime_services,
89        config::POW_MINIMUM_PER_RPC_SERVER_KNOWN,
90        flags,
91        sponsor_id,
92        &destination_peer.address,
93        &destination_peer.id,
94        &destination_peer.verification_key_bytes,
95        &destination_peer.pq_commitment_bytes,
96        payload_request_kind,
97        payload,
98    )
99    .await
100}
101
102pub async fn rpc_server_known_with_requisite_pow(
103    runtime_services: &RuntimeServices,
104    sponsor_id: &Id,
105    destination_peer: &Peer,
106    payload_request_kind: PayloadRequestKind,
107    payload: Bytes,
108    requisite_pow: Pow,
109) -> anyhow::Result<RpcResponsePacketRx> {
110    let flags = RpcRequestPacketTxFlags::COMPRESSED | RpcRequestPacketTxFlags::SERVER_KNOWN;
111    rpc_server_xxx(
112        runtime_services,
113        requisite_pow,
114        flags,
115        sponsor_id,
116        &destination_peer.address,
117        &destination_peer.id,
118        &destination_peer.verification_key_bytes,
119        &destination_peer.pq_commitment_bytes,
120        payload_request_kind,
121        payload,
122    )
123    .await
124}
125
126pub async fn rpc_server_known_with_requisite_pow_and_no_compression(
127    runtime_services: &RuntimeServices,
128    sponsor_id: &Id,
129    destination_peer: &Peer,
130    payload_request_kind: PayloadRequestKind,
131    payload: Bytes,
132    requisite_pow: Pow,
133) -> anyhow::Result<RpcResponsePacketRx> {
134    let flags = RpcRequestPacketTxFlags::SERVER_KNOWN;
135    rpc_server_xxx(
136        runtime_services,
137        requisite_pow,
138        flags,
139        sponsor_id,
140        &destination_peer.address,
141        &destination_peer.id,
142        &destination_peer.verification_key_bytes,
143        &destination_peer.pq_commitment_bytes,
144        payload_request_kind,
145        payload,
146    )
147    .await
148}
149
150async fn rpc_server_xxx(
151    runtime_services: &RuntimeServices,
152    pow_minimum_per_rpc: Pow,
153    flags: RpcRequestPacketTxFlags,
154    sponsor_id: &Id,
155    destination_address: &str,
156    destination_id: &Id,
157    destination_verification_key_bytes: &VerificationKeyBytes,
158    destination_pq_commitment_bytes: &PQCommitmentBytes,
159    payload_request_kind: PayloadRequestKind,
160    payload: Bytes,
161) -> anyhow::Result<RpcResponsePacketRx> {
162    let rpc_request_packet = RpcRequestPacketTx::encode(runtime_services.time_provider.as_ref(), pow_minimum_per_rpc, flags, payload_request_kind, sponsor_id, destination_verification_key_bytes, destination_pq_commitment_bytes, payload, runtime_services.pow_generator.as_ref()).await?;
163
164    let response_bytes = runtime_services.transport_factory.rpc(destination_address, rpc_request_packet.bytes).await?;
165
166    let rpc_response_packet = RpcResponsePacketRx::decode(destination_id, &rpc_request_packet.pow_content_hash, config::SERVER_KEY_POW_MIN, response_bytes)?;
167
168    // Check if there was a server error
169    if rpc_response_packet.response_request_kind == PayloadResponseKind::ErrorResponseV1 {
170        let response = json::bytes_to_struct::<ErrorResponseV1>(&rpc_response_packet.bytes)?;
171        return Err(anyhow::anyhow!("server error {}: {}", response.code, response.message));
172    }
173
174    Ok(rpc_response_packet)
175}
176
177#[cfg(test)]
178mod tests {
179    use std::sync::Arc;
180    use bytes::Bytes;
181    use crate::protocol::payload::payload::{PayloadRequestKind, PayloadResponseKind};
182    use crate::protocol::rpc::rpc_request::{RpcRequestPacketRx, RpcRequestPacketTx, RpcRequestPacketTxFlags};
183    use crate::protocol::rpc::rpc_response::{RpcResponsePacketRx, RpcResponsePacketTx, RpcResponsePacketTxFlags};
184    use crate::tools::parallel_pow_generator::StubParallelPowGenerator;
185    use crate::tools::server_id::ServerId;
186    use crate::tools::time_provider::time_provider::RealTimeProvider;
187    use crate::tools::tools;
188    use crate::tools::{types::{Id, PQCommitmentBytes, Pow, VerificationKeyBytes}, BytesGatherer};
189    use log::trace;
190    use crate::tools::runtime_services::RuntimeServices;
191
192    #[tokio::test]
193    async fn rpc_request_packet_txrx() -> anyhow::Result<()> {
194        let runtime_services = RuntimeServices::default_for_testing();
195
196        let pow_min = Pow(12);
197        let pow_minimum_per_rpc = Pow(12);
198
199        let server_id = ServerId::new(runtime_services.time_provider.as_ref(), pow_min, true, runtime_services.pow_generator.as_ref()).await?;
200        trace!("server_id.pow={}", server_id.pow);
201        let payload_request_kind = PayloadRequestKind::AnnounceV1;
202        let mut payload_request = [0u8; 1024];
203        tools::random_fill_bytes(&mut payload_request);
204        let request_flags = RpcRequestPacketTxFlags::COMPRESSED | RpcRequestPacketTxFlags::SERVER_KNOWN;
205
206        let rpc_request_packet_tx = RpcRequestPacketTx::encode(
207            runtime_services.time_provider.as_ref(),
208            pow_minimum_per_rpc,
209            request_flags,
210            payload_request_kind.clone(),
211            &server_id.id,
212            &server_id.keys.verification_key_bytes,
213            &server_id.keys.pq_commitment_bytes,
214            Bytes::copy_from_slice(&payload_request),
215            runtime_services.pow_generator.as_ref(),
216        )
217        .await?;
218
219        let rpc_request_packet_rx = RpcRequestPacketRx::decode(&server_id.timestamp, &server_id.keys.verification_key_bytes, &server_id.keys.pq_commitment_bytes, rpc_request_packet_tx.bytes)?;
220        assert_eq!(payload_request_kind, rpc_request_packet_rx.payload_request_kind);
221        assert_eq!(payload_request, rpc_request_packet_rx.bytes.as_ref());
222        assert_eq!(true, rpc_request_packet_rx.pow_server_known);
223
224        let mut payload_response = [0u8; 1024];
225        tools::random_fill_bytes(&mut payload_response);
226        let response_flags = RpcResponsePacketTxFlags::COMPRESSED;
227
228        let gatherer = RpcResponsePacketTx::encode(
229            &server_id.keys.signature_key,
230            &server_id.keys.verification_key_bytes,
231            &server_id.keys.pq_commitment_bytes,
232            &server_id.sponsor_id,
233            &server_id.timestamp,
234            &server_id.hash,
235            &server_id.salt,
236            &rpc_request_packet_rx.pow_content_hash,
237            response_flags,
238            PayloadResponseKind::AnnounceResponseV1,
239            BytesGatherer::from_bytes(Bytes::copy_from_slice(&payload_response)),
240        )?;
241
242        let rpc_response_packet_rx = RpcResponsePacketRx::decode(&server_id.id, &rpc_request_packet_rx.pow_content_hash, pow_min, gatherer.to_bytes())?;
243
244        assert_eq!(rpc_response_packet_rx.response_request_kind, PayloadResponseKind::AnnounceResponseV1);
245        assert_eq!(rpc_response_packet_rx.bytes.as_ref(), payload_response);
246
247        Ok(())
248    }
249
250    #[tokio::test]
251    async fn rpc_request_packet_txrx_server_unknown() -> anyhow::Result<()> {
252        let time_provider = RealTimeProvider::default();
253        let pow_generator: Arc<dyn crate::tools::parallel_pow_generator::ParallelPowGenerator> = Arc::new(StubParallelPowGenerator::new());
254
255        let pow_min_for_server_id = Pow(12);
256        let pow_min_for_rpc = Pow(12);
257
258        let server_id = ServerId::new(&time_provider, pow_min_for_server_id, true, pow_generator.as_ref()).await?;
259        let payload_request_kind = PayloadRequestKind::AnnounceV1;
260        let mut payload_request = [0u8; 1024];
261        tools::random_fill_bytes(&mut payload_request);
262        let flags = RpcRequestPacketTxFlags::COMPRESSED;
263
264        let rpc_request_packet_tx = RpcRequestPacketTx::encode(&time_provider, pow_min_for_rpc, flags, payload_request_kind.clone(), &Id::zero(), &VerificationKeyBytes::zero(), &PQCommitmentBytes::zero(), Bytes::copy_from_slice(&payload_request), pow_generator.as_ref()).await?;
265
266        let rpc_request_packet_rx = RpcRequestPacketRx::decode(&server_id.timestamp, &server_id.keys.verification_key_bytes, &server_id.keys.pq_commitment_bytes, rpc_request_packet_tx.bytes)?;
267        assert_eq!(payload_request_kind, rpc_request_packet_rx.payload_request_kind);
268        assert_eq!(payload_request, rpc_request_packet_rx.bytes.as_ref());
269        assert_eq!(false, rpc_request_packet_rx.pow_server_known);
270
271        let mut payload_response = [0u8; 1024];
272        tools::random_fill_bytes(&mut payload_response);
273        let response_flags = RpcResponsePacketTxFlags::COMPRESSED;
274        let gatherer = RpcResponsePacketTx::encode(
275            &server_id.keys.signature_key,
276            &server_id.keys.verification_key_bytes,
277            &server_id.keys.pq_commitment_bytes,
278            &server_id.sponsor_id,
279            &server_id.timestamp,
280            &server_id.hash,
281            &server_id.salt,
282            &rpc_request_packet_rx.pow_content_hash,
283            response_flags,
284            PayloadResponseKind::AnnounceResponseV1,
285            BytesGatherer::from_bytes(Bytes::copy_from_slice(&payload_response)),
286        )?;
287
288        let rpc_response_packet_rx = RpcResponsePacketRx::decode(&Id::zero(), &rpc_request_packet_rx.pow_content_hash, pow_min_for_server_id, gatherer.to_bytes())?;
289
290        assert_eq!(rpc_response_packet_rx.response_request_kind, PayloadResponseKind::AnnounceResponseV1);
291        assert_eq!(rpc_response_packet_rx.bytes.as_ref(), payload_response);
292
293        Ok(())
294    }
295
296    // ── Robustness tests: RpcRequestPacketRx::decode ──
297
298    use crate::tools::time::TimeMillis;
299    use crate::tools::hashing;
300
301    #[test]
302    fn rpc_request_decode_empty_input() {
303        let timestamp = TimeMillis::zero();
304        let verification_key_bytes = VerificationKeyBytes::zero();
305        let pq_commitment_bytes = PQCommitmentBytes::zero();
306        assert!(RpcRequestPacketRx::decode(&timestamp, &verification_key_bytes, &pq_commitment_bytes, Bytes::new()).is_err());
307    }
308
309    #[test]
310    fn rpc_request_decode_single_byte() {
311        let timestamp = TimeMillis::zero();
312        let verification_key_bytes = VerificationKeyBytes::zero();
313        let pq_commitment_bytes = PQCommitmentBytes::zero();
314        assert!(RpcRequestPacketRx::decode(&timestamp, &verification_key_bytes, &pq_commitment_bytes, Bytes::from_static(&[1u8])).is_err());
315    }
316
317    #[test]
318    fn rpc_request_decode_garbage() {
319        let timestamp = TimeMillis::zero();
320        let verification_key_bytes = VerificationKeyBytes::zero();
321        let pq_commitment_bytes = PQCommitmentBytes::zero();
322        assert!(RpcRequestPacketRx::decode(&timestamp, &verification_key_bytes, &pq_commitment_bytes, Bytes::from_static(&[0xff; 256])).is_err());
323    }
324
325    // ── Robustness tests: RpcResponsePacketRx::decode ──
326
327    #[test]
328    fn rpc_response_decode_empty_input() {
329        let destination_id = Id::zero();
330        let pow_content_hash = hashing::hash(&[0u8]);
331        assert!(RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::new()).is_err());
332    }
333
334    #[test]
335    fn rpc_response_decode_single_byte() {
336        let destination_id = Id::zero();
337        let pow_content_hash = hashing::hash(&[0u8]);
338        assert!(RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::from_static(&[1u8])).is_err());
339    }
340
341    #[test]
342    fn rpc_response_decode_garbage() {
343        let destination_id = Id::zero();
344        let pow_content_hash = hashing::hash(&[0u8]);
345        assert!(RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::from_static(&[0xff; 256])).is_err());
346    }
347
348    #[test]
349    fn rpc_response_decode_header_too_short_for_payload_len() {
350        // The header up to (but not including) the u32 payload_len field is 212 bytes.
351        // Providing 214 bytes (header + 2) used to pass the old `+ 2` minimum-length check
352        // but panic on get_u32_le() which needs 4 bytes. Now correctly rejected.
353        let mut data = vec![0u8; 214];
354        data[0] = 1; // version
355        let destination_id = Id::zero();
356        let pow_content_hash = hashing::hash(&[0u8]);
357        assert!(RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::from(data)).is_err());
358    }
359
360    #[cfg(not(target_arch = "wasm32"))]
361    mod bolero_fuzz {
362        use bytes::Bytes;
363        use crate::protocol::rpc::rpc_request::RpcRequestPacketRx;
364        use crate::protocol::rpc::rpc_response::RpcResponsePacketRx;
365        use crate::tools::types::{Id, PQCommitmentBytes, Pow, VerificationKeyBytes};
366        use crate::tools::time::TimeMillis;
367        use crate::tools::hashing;
368
369        #[test]
370        fn fuzz_rpc_request_decode() {
371            bolero::check!().for_each(|data: &[u8]| {
372                let timestamp = TimeMillis::zero();
373                let verification_key_bytes = VerificationKeyBytes::zero();
374                let pq_commitment_bytes = PQCommitmentBytes::zero();
375                let _ = RpcRequestPacketRx::decode(&timestamp, &verification_key_bytes, &pq_commitment_bytes, Bytes::copy_from_slice(data));
376            });
377        }
378
379        #[test]
380        fn fuzz_rpc_response_decode() {
381            bolero::check!().for_each(|data: &[u8]| {
382                let destination_id = Id::zero();
383                let pow_content_hash = hashing::hash(&[0u8]);
384                let _ = RpcResponsePacketRx::decode(&destination_id, &pow_content_hash, Pow(0), Bytes::copy_from_slice(data));
385            });
386        }
387    }
388}