Skip to content

Commit

Permalink
proto: move enum Message from geyser crate (#459)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Nov 14, 2024
1 parent 32d21df commit 6d56e68
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 120 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ jobs:
run: cargo check -p yellowstone-grpc-geyser --all-targets --tests
- name: check features in `proto`
run: cargo check -p yellowstone-grpc-proto --all-targets --tests
- name: check features in `proto`
run: cargo check -p yellowstone-grpc-proto --all-targets --tests --all-features

- name: Build
run: ./ci/cargo-build-test.sh
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The minor version will be incremented upon a breaking change and the patch versi
- examples: add progress bar to client tool ([#456](https://github.com/rpcpool/yellowstone-grpc/pull/456))
- proto: change error type in mod `convert_from` ([#457](https://github.com/rpcpool/yellowstone-grpc/pull/457))
- proto: add mod `plugin` with `FilterNames` cache ([#458](https://github.com/rpcpool/yellowstone-grpc/pull/458))
- proto: move enum Message from geyser crate ([#459](https://github.com/rpcpool/yellowstone-grpc/pull/459))

### Breaking

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"blocks_meta": {
"max": 1
},
"entry": {
"entries": {
"max": 1
}
}
Expand Down
6 changes: 3 additions & 3 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ pub struct ConfigGrpcFilters {
pub transactions_status: ConfigGrpcFiltersTransactions,
pub blocks: ConfigGrpcFiltersBlocks,
pub blocks_meta: ConfigGrpcFiltersBlocksMeta,
pub entry: ConfigGrpcFiltersEntry,
pub entries: ConfigGrpcFiltersEntries,
}

impl ConfigGrpcFilters {
Expand Down Expand Up @@ -376,12 +376,12 @@ impl Default for ConfigGrpcFiltersBlocksMeta {

#[derive(Debug, Clone, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct ConfigGrpcFiltersEntry {
pub struct ConfigGrpcFiltersEntries {
#[serde(deserialize_with = "deserialize_usize_str")]
pub max: usize,
}

impl Default for ConfigGrpcFiltersEntry {
impl Default for ConfigGrpcFiltersEntries {
fn default() -> Self {
Self { max: usize::MAX }
}
Expand Down
69 changes: 36 additions & 33 deletions yellowstone-grpc-geyser/src/filters.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
use {
crate::{
config::{
ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks,
ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersEntry, ConfigGrpcFiltersSlots,
ConfigGrpcFiltersTransactions,
},
message::{
Message, MessageAccount, MessageAccountInfo, MessageBlock, MessageBlockMeta,
MessageEntry, MessageSlot, MessageTransaction, MessageTransactionInfo,
},
crate::config::{
ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks,
ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersEntries, ConfigGrpcFiltersSlots,
ConfigGrpcFiltersTransactions,
},
base64::{engine::general_purpose::STANDARD as base64_engine, Engine},
solana_sdk::{pubkey::Pubkey, signature::Signature},
Expand All @@ -21,13 +15,20 @@ use {
},
yellowstone_grpc_proto::{
convert_to,
plugin::filter::{FilterAccountsDataSlice, FilterName, FilterNames},
plugin::{
filter::{FilterAccountsDataSlice, FilterName, FilterNames},
message::{
CommitmentLevel, Message, MessageAccount, MessageAccountInfo, MessageBlock,
MessageBlockMeta, MessageEntry, MessageSlot, MessageTransaction,
MessageTransactionInfo,
},
},
prelude::{
subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof,
subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports,
subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof,
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts,
subscribe_update::UpdateOneof, CommitmentLevel as CommitmentLevelProto,
SubscribeRequest, SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts,
SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterLamports,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterEntry, SubscribeRequestFilterSlots,
Expand Down Expand Up @@ -193,7 +194,7 @@ pub struct Filter {
slots: FilterSlots,
transactions: FilterTransactions,
transactions_status: FilterTransactions,
entry: FilterEntry,
entries: FilterEntries,
blocks: FilterBlocks,
blocks_meta: FilterBlocksMeta,
commitment: CommitmentLevel,
Expand All @@ -214,7 +215,7 @@ impl Default for Filter {
filter_type: FilterTransactionsType::TransactionStatus,
filters: HashMap::new(),
},
entry: FilterEntry::default(),
entries: FilterEntries::default(),
blocks: FilterBlocks::default(),
blocks_meta: FilterBlocksMeta::default(),
commitment: CommitmentLevel::Processed,
Expand Down Expand Up @@ -245,7 +246,7 @@ impl Filter {
FilterTransactionsType::TransactionStatus,
names,
)?,
entry: FilterEntry::new(&config.entry, &limit.entry, names)?,
entries: FilterEntries::new(&config.entry, &limit.entries, names)?,
blocks: FilterBlocks::new(&config.blocks, &limit.blocks, names)?,
blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta, names)?,
commitment: Self::decode_commitment(config.commitment)?,
Expand All @@ -258,10 +259,12 @@ impl Filter {
}

fn decode_commitment(commitment: Option<i32>) -> anyhow::Result<CommitmentLevel> {
let commitment = commitment.unwrap_or(CommitmentLevel::Processed as i32);
CommitmentLevel::try_from(commitment).map_err(|_error| {
anyhow::anyhow!("failed to create CommitmentLevel from {commitment:?}")
})
let commitment = commitment.unwrap_or(CommitmentLevelProto::Processed as i32);
CommitmentLevelProto::try_from(commitment)
.map(Into::into)
.map_err(|_error| {
anyhow::anyhow!("failed to create CommitmentLevel from {commitment:?}")
})
}

fn decode_pubkeys<'a>(
Expand Down Expand Up @@ -296,7 +299,7 @@ impl Filter {
"transactions_status",
self.transactions_status.filters.len(),
),
("entry", self.entry.filters.len()),
("entries", self.entries.filters.len()),
("blocks", self.blocks.filters.len()),
("blocks_meta", self.blocks_meta.filters.len()),
(
Expand All @@ -305,7 +308,7 @@ impl Filter {
+ self.slots.filters.len()
+ self.transactions.filters.len()
+ self.transactions_status.filters.len()
+ self.entry.filters.len()
+ self.entries.filters.len()
+ self.blocks.filters.len()
+ self.blocks_meta.filters.len(),
),
Expand All @@ -329,7 +332,7 @@ impl Filter {
.get_filters(message)
.chain(self.transactions_status.get_filters(message)),
),
Message::Entry(message) => self.entry.get_filters(message),
Message::Entry(message) => self.entries.get_filters(message),
Message::Block(message) => self.blocks.get_filters(message),
Message::BlockMeta(message) => self.blocks_meta.get_filters(message),
}
Expand Down Expand Up @@ -912,14 +915,14 @@ impl FilterTransactions {
}

#[derive(Debug, Default, Clone)]
struct FilterEntry {
struct FilterEntries {
filters: Vec<FilterName>,
}

impl FilterEntry {
impl FilterEntries {
fn new(
configs: &HashMap<String, SubscribeRequestFilterEntry>,
limit: &ConfigGrpcFiltersEntry,
limit: &ConfigGrpcFiltersEntries,
names: &mut FilterNames,
) -> anyhow::Result<Self> {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
Expand Down Expand Up @@ -1143,11 +1146,7 @@ pub fn parse_accounts_data_slice_create(
mod tests {
use {
super::{FilterName, FilterNames, FilteredMessage},
crate::{
config::ConfigGrpcFilters,
filters::Filter,
message::{Message, MessageTransaction, MessageTransactionInfo},
},
crate::{config::ConfigGrpcFilters, filters::Filter},
solana_sdk::{
hash::Hash,
message::{v0::LoadedAddresses, Message as SolMessage, MessageHeader},
Expand All @@ -1157,8 +1156,12 @@ mod tests {
},
solana_transaction_status::TransactionStatusMeta,
std::{collections::HashMap, sync::Arc, time::Duration},
yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterTransactions,
yellowstone_grpc_proto::{
geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterTransactions,
},
plugin::message::{Message, MessageTransaction, MessageTransactionInfo},
},
};

Expand Down
37 changes: 26 additions & 11 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use {
crate::{
config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters},
filters::Filter,
message::{Message, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransactionInfo},
metrics::{self, DebugClientMessage},
version::GrpcVersionInfo,
},
Expand Down Expand Up @@ -37,11 +36,17 @@ use {
},
tonic_health::server::health_reporter,
yellowstone_grpc_proto::{
plugin::filter::FilterNames,
plugin::{
filter::FilterNames,
message::{
CommitmentLevel, Message, MessageBlockMeta, MessageEntry, MessageSlot,
MessageTransactionInfo,
},
},
prelude::{
geyser_server::{Geyser, GeyserServer},
subscribe_update::UpdateOneof,
CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse,
CommitmentLevel as CommitmentLevelProto, GetBlockHeightRequest, GetBlockHeightResponse,
GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse,
GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest,
IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate,
Expand Down Expand Up @@ -156,11 +161,13 @@ impl BlockMetaStorage {
}

fn parse_commitment(commitment: Option<i32>) -> Result<CommitmentLevel, Status> {
let commitment = commitment.unwrap_or(CommitmentLevel::Processed as i32);
CommitmentLevel::try_from(commitment).map_err(|_error| {
let msg = format!("failed to create CommitmentLevel from {commitment:?}");
Status::unknown(msg)
})
let commitment = commitment.unwrap_or(CommitmentLevelProto::Processed as i32);
CommitmentLevelProto::try_from(commitment)
.map(Into::into)
.map_err(|_error| {
let msg = format!("failed to create CommitmentLevel from {commitment:?}");
Status::unknown(msg)
})
}

async fn get_block<F, T>(
Expand Down Expand Up @@ -533,13 +540,21 @@ impl GrpcService {

// If we already build Block message, new message will be a problem
if slot_messages.sealed && !(matches!(&message, Message::Entry(_)) && slot_messages.entries_count == 0) {
metrics::update_invalid_blocks(format!("unexpected message {}", message.kind()));
let kind = match &message {
Message::Slot(_) => "Slot",
Message::Account(_) => "Account",
Message::Transaction(_) => "Transaction",
Message::Entry(_) => "Entry",
Message::BlockMeta(_) => "BlockMeta",
Message::Block(_) => "Block",
};
metrics::update_invalid_blocks(format!("unexpected message {kind}"));
match block_fail_action {
ConfigBlockFailAction::Log => {
error!("unexpected message #{} -- {} (invalid order)", message.get_slot(), message.kind());
error!("unexpected message #{} -- {kind} (invalid order)", message.get_slot());
}
ConfigBlockFailAction::Panic => {
panic!("unexpected message #{} -- {} (invalid order)", message.get_slot(), message.kind());
panic!("unexpected message #{} -- {kind} (invalid order)", message.get_slot());
}
}
}
Expand Down
1 change: 0 additions & 1 deletion yellowstone-grpc-geyser/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod config;
pub mod filters;
pub mod grpc;
pub mod message;
pub mod metrics;
pub mod plugin;
pub mod version;
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use {
sync::{mpsc, oneshot, Notify},
task::JoinHandle,
},
yellowstone_grpc_proto::prelude::CommitmentLevel,
yellowstone_grpc_proto::plugin::message::CommitmentLevel,
};

lazy_static::lazy_static! {
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use {
crate::{
config::Config,
grpc::GrpcService,
message::Message,
metrics::{self, PrometheusService},
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
Expand All @@ -22,6 +21,7 @@ use {
runtime::{Builder, Runtime},
sync::{mpsc, Notify},
},
yellowstone_grpc_proto::plugin::message::Message,
};

#[derive(Debug)]
Expand Down
14 changes: 12 additions & 2 deletions yellowstone-grpc-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = { workspace = true }
publish = true

[dependencies]
agave-geyser-plugin-interface = { workspace = true, optional = true }
bincode = { workspace = true, optional = true }
prost = { workspace = true }
solana-account-decoder = { workspace = true, optional = true }
Expand All @@ -26,8 +27,17 @@ tonic-build = { workspace = true }

[features]
default = ["convert", "tonic-compression"]
convert = ["dep:bincode", "dep:solana-account-decoder", "dep:solana-sdk", "dep:solana-transaction-status"]
plugin = ["dep:thiserror"]
convert = [
"dep:bincode",
"dep:solana-account-decoder",
"dep:solana-sdk",
"dep:solana-transaction-status"
]
plugin = [
"convert",
"dep:agave-geyser-plugin-interface",
"dep:thiserror"
]
tonic-compression = ["tonic/gzip", "tonic/zstd"]

[lints]
Expand Down
Loading

0 comments on commit 6d56e68

Please sign in to comment.