Skip to content

Commit

Permalink
sui-network: manually define rpc service
Browse files Browse the repository at this point in the history
Manually define our Validator rpc service instead of relying on a .proto
file. This has the advantange that for now we can use types already
defined in Rust for wire messages instead of requiring proto definitions
or using the awkward double serialization we were currently using.
bmwill committed May 6, 2022
1 parent 0d990f8 commit a0d21bf
Showing 13 changed files with 280 additions and 348 deletions.
66 changes: 3 additions & 63 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/sui-network/Cargo.toml
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ tonic = "0.7"
prost = "0.10"
bincode = "1.3.3"
serde = "1.0.136"
sui-types = { path = "../../sui_types" }

[dev-dependencies]
tonic-build = { version = "0.7", features = [ "prost", "transport" ] }
prost-build = "0.10.1"
tonic-build = { git = "https://github.com/hyperium/tonic.git", rev = "de2e4ac077c076736dc451f3415ea7da1a61a560", default-features = false, features = [ "transport" ] }
2 changes: 1 addition & 1 deletion crates/sui-network/README.md
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
## Changing an RPC service

The general process for changing an RPC service is as follows:
1. Change the corresponding `.proto` file in the `proto` directory
1. Change the service definition in the `tests/bootstrap.rs` file.
2. Run `cargo test --test bootstrap` to re-run the code generation.
Generated rust files are in the `src/generated` directory.
3. Update any other corresponding logic that would have been affected by
12 changes: 0 additions & 12 deletions crates/sui-network/proto/common.proto

This file was deleted.

19 changes: 0 additions & 19 deletions crates/sui-network/proto/validator.proto

This file was deleted.

24 changes: 1 addition & 23 deletions crates/sui-network/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,11 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#[path = "generated/sui.validator.rs"]
#[path = "generated/sui.validator.Validator.rs"]
#[rustfmt::skip]
mod validator;

#[path = "generated/sui.common.rs"]
#[rustfmt::skip]
mod common;

pub use common::BincodeEncodedPayload;
pub use validator::{
validator_client::ValidatorClient,
validator_server::{Validator, ValidatorServer},
};

impl BincodeEncodedPayload {
pub fn deserialize<T: serde::de::DeserializeOwned>(&self) -> Result<T, bincode::Error> {
bincode::deserialize(self.payload.as_ref())
}

pub fn try_from<T: serde::Serialize>(value: &T) -> Result<Self, bincode::Error> {
let payload = bincode::serialize(value)?.into();
Ok(Self { payload })
}
}

impl From<bytes::Bytes> for BincodeEncodedPayload {
fn from(payload: bytes::Bytes) -> Self {
Self { payload }
}
}
65 changes: 65 additions & 0 deletions crates/sui-network/src/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use bytes::{Buf, BufMut};
use std::marker::PhantomData;
use tonic::{
codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder},
Status,
};

#[derive(Debug)]
pub struct BincodeEncoder<T>(PhantomData<T>);

impl<T: serde::Serialize> Encoder for BincodeEncoder<T> {
type Item = T;
type Error = Status;

fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
bincode::serialize_into(buf.writer(), &item).map_err(|e| Status::internal(e.to_string()))
}
}

#[derive(Debug)]
pub struct BincodeDecoder<U>(PhantomData<U>);

impl<U: serde::de::DeserializeOwned> Decoder for BincodeDecoder<U> {
type Item = U;
type Error = Status;

fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
if !buf.has_remaining() {
return Ok(None);
}

let item: Self::Item =
bincode::deserialize_from(buf.reader()).map_err(|e| Status::internal(e.to_string()))?;
Ok(Some(item))
}
}

/// A [`Codec`] that implements `application/grpc+bincode` via the serde library.
#[derive(Debug, Clone)]
pub struct BincodeCodec<T, U>(PhantomData<(T, U)>);

impl<T, U> Default for BincodeCodec<T, U> {
fn default() -> Self {
Self(PhantomData)
}
}

impl<T, U> Codec for BincodeCodec<T, U>
where
T: serde::Serialize + Send + 'static,
U: serde::de::DeserializeOwned + Send + 'static,
{
type Encode = T;
type Decode = U;
type Encoder = BincodeEncoder<T>;
type Decoder = BincodeDecoder<U>;

fn encoder(&mut self) -> Self::Encoder {
BincodeEncoder(PhantomData)
}

fn decoder(&mut self) -> Self::Decoder {
BincodeDecoder(PhantomData)
}
}
9 changes: 0 additions & 9 deletions crates/sui-network/src/generated/sui.common.rs

This file was deleted.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/sui-network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

pub mod api;
pub mod codec;
pub mod network;

pub use prost;
pub use tonic;
84 changes: 74 additions & 10 deletions crates/sui-network/tests/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -6,26 +6,90 @@ use std::{
path::{Path, PathBuf},
process::Command,
};
use tonic_build::manual::{Builder, Method, Service};

type Result<T> = ::std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

#[test]
fn bootstrap() {
let proto_files = &["proto/validator.proto", "proto/common.proto"];
let dirs = &["proto"];

let out_dir = PathBuf::from(std::env!("CARGO_MANIFEST_DIR"))
.join("src")
.join("generated");
let codec_path = "crate::codec::BincodeCodec";

// Use `Bytes` instead of `Vec<u8>` for bytes fields
let mut config = prost_build::Config::new();
config.bytes(&["."]);
let validator_service = Service::builder()
.name("Validator")
.package("sui.validator")
.comment("The Validator interface")
.method(
Method::builder()
.name("transaction")
.route_name("Transaction")
.input_type("sui_types::messages::Transaction")
.output_type("sui_types::messages::TransactionInfoResponse")
.codec_path(codec_path)
.build(),
)
.method(
Method::builder()
.name("confirmation_transaction")
.route_name("ConfirmationTransaction")
.input_type("sui_types::messages::CertifiedTransaction")
.output_type("sui_types::messages::TransactionInfoResponse")
.codec_path(codec_path)
.build(),
)
.method(
Method::builder()
.name("consensus_transaction")
.route_name("ConsensusTransaction")
.input_type("sui_types::messages::ConsensusTransaction")
.output_type("sui_types::messages::TransactionInfoResponse")
.codec_path(codec_path)
.build(),
)
.method(
Method::builder()
.name("account_info")
.route_name("AccountInfo")
.input_type("sui_types::messages::AccountInfoRequest")
.output_type("sui_types::messages::AccountInfoResponse")
.codec_path(codec_path)
.build(),
)
.method(
Method::builder()
.name("object_info")
.route_name("ObjectInfo")
.input_type("sui_types::messages::ObjectInfoRequest")
.output_type("sui_types::messages::ObjectInfoResponse")
.codec_path(codec_path)
.build(),
)
.method(
Method::builder()
.name("transaction_info")
.route_name("TransactionInfo")
.input_type("sui_types::messages::TransactionInfoRequest")
.output_type("sui_types::messages::TransactionInfoResponse")
.codec_path(codec_path)
.build(),
)
.method(
Method::builder()
.name("batch_info")
.route_name("BatchInfo")
.input_type("sui_types::messages::BatchInfoRequest")
.output_type("sui_types::messages::BatchInfoResponseItem")
.server_streaming()
.codec_path(codec_path)
.build(),
)
.build();

tonic_build::configure()
.out_dir(format!("{}", out_dir.display()))
.compile_with_config(config, proto_files, dirs)
.unwrap();
Builder::new()
.out_dir(&out_dir)
.compile(&[validator_service]);

prepend_license(&out_dir).unwrap();

99 changes: 37 additions & 62 deletions sui_core/src/authority_client.rs
Original file line number Diff line number Diff line change
@@ -6,11 +6,7 @@ use crate::authority::AuthorityState;
use async_trait::async_trait;
use futures::{stream::BoxStream, TryStreamExt};
use std::sync::Arc;
use sui_network::{
api::{BincodeEncodedPayload, ValidatorClient},
network::NetworkClient,
tonic,
};
use sui_network::{api::ValidatorClient, network::NetworkClient, tonic};
use sui_types::{error::SuiError, messages::*};

#[cfg(test)]
@@ -112,102 +108,81 @@ impl AuthorityAPI for NetworkAuthorityClient {
&self,
transaction: Transaction,
) -> Result<TransactionInfoResponse, SuiError> {
let request = BincodeEncodedPayload::try_from(&transaction).unwrap();
let response = self.client().transaction(request).await?.into_inner();

response
.deserialize()
.map_err(|_| SuiError::UnexpectedMessage)
self.client()
.transaction(transaction)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}

/// Confirm a transfer to a Sui or Primary account.
async fn handle_confirmation_transaction(
&self,
transaction: ConfirmationTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
let request = BincodeEncodedPayload::try_from(&transaction).unwrap();
let response = self
.client()
.confirmation_transaction(request)
.await?
.into_inner();

response
.deserialize()
.map_err(|_| SuiError::UnexpectedMessage)
self.client()
.confirmation_transaction(transaction.certificate)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}

async fn handle_consensus_transaction(
&self,
transaction: ConsensusTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
let request = BincodeEncodedPayload::try_from(&transaction).unwrap();
let response = self
.client()
.consensus_transaction(request)
.await?
.into_inner();

response
.deserialize()
.map_err(|e| SuiError::GenericAuthorityError {
error: e.to_string(),
})
self.client()
.consensus_transaction(transaction)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}

async fn handle_account_info_request(
&self,
request: AccountInfoRequest,
) -> Result<AccountInfoResponse, SuiError> {
let request = BincodeEncodedPayload::try_from(&request).unwrap();
let response = self.client().account_info(request).await?.into_inner();

response
.deserialize()
.map_err(|_| SuiError::UnexpectedMessage)
self.client()
.account_info(request)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}

async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, SuiError> {
let request = BincodeEncodedPayload::try_from(&request).unwrap();
let response = self.client().object_info(request).await?.into_inner();

response
.deserialize()
.map_err(|_| SuiError::UnexpectedMessage)
self.client()
.object_info(request)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}

/// Handle Object information requests for this account.
async fn handle_transaction_info_request(
&self,
request: TransactionInfoRequest,
) -> Result<TransactionInfoResponse, SuiError> {
let request = BincodeEncodedPayload::try_from(&request).unwrap();
let response = self.client().transaction_info(request).await?.into_inner();

response
.deserialize()
.map_err(|_| SuiError::UnexpectedMessage)
self.client()
.transaction_info(request)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}

/// Handle Batch information requests for this authority.
async fn handle_batch_stream(
&self,
request: BatchInfoRequest,
) -> Result<BatchInfoResponseItemStream, SuiError> {
let request = BincodeEncodedPayload::try_from(&request).unwrap();
let response_stream = self.client().batch_info(request).await?.into_inner();

let stream = response_stream
.map_err(|_| SuiError::UnexpectedMessage)
.and_then(|item| {
let response_item = item
.deserialize::<BatchInfoResponseItem>()
.map_err(|_| SuiError::UnexpectedMessage);
futures::future::ready(response_item)
});
let stream = self
.client()
.batch_info(request)
.await
.map(tonic::Response::into_inner)?
.map_err(Into::into);

Ok(Box::pin(stream))
}
104 changes: 29 additions & 75 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ use async_trait::async_trait;
use futures::{stream::BoxStream, FutureExt, TryStreamExt};
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
use sui_network::{
api::{BincodeEncodedPayload, Validator, ValidatorServer},
api::{Validator, ValidatorServer},
network::NetworkServer,
tonic,
};
@@ -145,12 +145,9 @@ impl AuthorityServer {
impl Validator for AuthorityServer {
async fn transaction(
&self,
request: tonic::Request<BincodeEncodedPayload>,
) -> Result<tonic::Response<BincodeEncodedPayload>, tonic::Status> {
let mut transaction: Transaction = request
.into_inner()
.deserialize()
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
let mut transaction = request.into_inner();

let mut obligation = VerificationObligation::default();
transaction
@@ -178,20 +175,14 @@ impl Validator for AuthorityServer {
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let payload = BincodeEncodedPayload::try_from(&info)
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(payload))
Ok(tonic::Response::new(info))
}

async fn confirmation_transaction(
&self,
request: tonic::Request<BincodeEncodedPayload>,
) -> Result<tonic::Response<BincodeEncodedPayload>, tonic::Status> {
let mut transaction: CertifiedTransaction = request
.into_inner()
.deserialize()
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
request: tonic::Request<CertifiedTransaction>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
let mut transaction = request.into_inner();

let mut obligation = VerificationObligation::default();
transaction
@@ -221,121 +212,84 @@ impl Validator for AuthorityServer {
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let payload = BincodeEncodedPayload::try_from(&info)
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(payload))
Ok(tonic::Response::new(info))
}

async fn consensus_transaction(
&self,
request: tonic::Request<BincodeEncodedPayload>,
) -> Result<tonic::Response<BincodeEncodedPayload>, tonic::Status> {
let transaction: ConsensusTransaction = request
.into_inner()
.deserialize()
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
request: tonic::Request<ConsensusTransaction>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
let transaction = request.into_inner();

let info = self
.consensus_adapter
.submit(&transaction)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let payload = BincodeEncodedPayload::try_from(&info)
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(payload))
Ok(tonic::Response::new(info))
}

async fn account_info(
&self,
request: tonic::Request<BincodeEncodedPayload>,
) -> Result<tonic::Response<BincodeEncodedPayload>, tonic::Status> {
let request: AccountInfoRequest = request
.into_inner()
.deserialize()
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
request: tonic::Request<AccountInfoRequest>,
) -> Result<tonic::Response<AccountInfoResponse>, tonic::Status> {
let request = request.into_inner();

let response = self
.state
.handle_account_info_request(request)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let payload = BincodeEncodedPayload::try_from(&response)
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(payload))
Ok(tonic::Response::new(response))
}

async fn object_info(
&self,
request: tonic::Request<BincodeEncodedPayload>,
) -> Result<tonic::Response<BincodeEncodedPayload>, tonic::Status> {
let request: ObjectInfoRequest = request
.into_inner()
.deserialize()
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
request: tonic::Request<ObjectInfoRequest>,
) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
let request = request.into_inner();

let response = self
.state
.handle_object_info_request(request)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let payload = BincodeEncodedPayload::try_from(&response)
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(payload))
Ok(tonic::Response::new(response))
}

async fn transaction_info(
&self,
request: tonic::Request<BincodeEncodedPayload>,
) -> Result<tonic::Response<BincodeEncodedPayload>, tonic::Status> {
let request: TransactionInfoRequest = request
.into_inner()
.deserialize()
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
request: tonic::Request<TransactionInfoRequest>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
let request = request.into_inner();

let response = self
.state
.handle_transaction_info_request(request)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let payload = BincodeEncodedPayload::try_from(&response)
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(payload))
Ok(tonic::Response::new(response))
}

type BatchInfoStream = BoxStream<'static, Result<BincodeEncodedPayload, tonic::Status>>;
type BatchInfoStream = BoxStream<'static, Result<BatchInfoResponseItem, tonic::Status>>;

async fn batch_info(
&self,
request: tonic::Request<BincodeEncodedPayload>,
request: tonic::Request<BatchInfoRequest>,
) -> Result<tonic::Response<Self::BatchInfoStream>, tonic::Status> {
let request: BatchInfoRequest = request
.into_inner()
.deserialize()
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
let request = request.into_inner();

let xstream = self
.state
.handle_batch_streaming(request)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let response =
// items
// .chain(subscriber)
xstream
.map_err(|e| tonic::Status::internal(e.to_string()))
.map_ok(|item| {
BincodeEncodedPayload::try_from(&item).expect("serialization should not fail")
});
let response = xstream.map_err(|e| tonic::Status::internal(e.to_string()));

Ok(tonic::Response::new(Box::pin(response)))
}

0 comments on commit a0d21bf

Please sign in to comment.