-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathmempool_collector.rs
161 lines (140 loc) · 4.99 KB
/
mempool_collector.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use std::sync::Arc;
use crate::types::{Collector, CollectorStream};
use alloy::transports::{RpcError, Transport, TransportErrorKind};
use alloy::{
primitives::B256, providers::Provider, pubsub::PubSubFrontend, rpc::types::eth::Transaction,
};
use async_trait::async_trait;
use eyre::WrapErr;
use futures::prelude::{stream::FuturesUnordered, Stream};
use futures::{FutureExt, StreamExt};
use std::future::Future;
use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
};
use tracing::error;
pub struct MempoolCollector {
provider: Arc<dyn Provider<PubSubFrontend>>,
}
impl MempoolCollector {
pub fn new(provider: Arc<dyn Provider<PubSubFrontend>>) -> Self {
Self { provider }
}
}
#[async_trait]
impl Collector<Transaction> for MempoolCollector {
fn name(&self) -> &str {
"MempoolCollector"
}
async fn get_event_stream(&self) -> eyre::Result<CollectorStream<'_, Transaction>> {
let stream = self
.provider
.subscribe_pending_transactions()
.await
.wrap_err("fail to subscribe to pending transaction stream")?
.into_stream();
let stream = TransactionStream::new(self.provider.as_ref(), stream, 256);
let stream = stream.filter_map(|res| async move { res.ok() });
Ok(Box::pin(stream))
}
}
/// Errors `TransactionStream` can throw
#[derive(Debug, thiserror::Error)]
pub enum GetTransactionError {
#[error("Failed to get transaction `{0}`: {1}")]
ProviderError(B256, RpcError<TransportErrorKind>),
/// `get_transaction` resulted in a `None`
#[error("Transaction `{0}` not found")]
NotFound(B256),
}
pub(crate) type TransactionFut<'a> = Pin<Box<dyn Future<Output = TransactionResult> + Send + 'a>>;
pub(crate) type TransactionResult = Result<Transaction, GetTransactionError>;
/// Drains a stream of transaction hashes and yields entire `Transaction`.
#[must_use = "streams do nothing unless polled"]
pub struct TransactionStream<'a, T, St> {
/// Currently running futures pending completion.
pub(crate) pending: FuturesUnordered<TransactionFut<'a>>,
/// Temporary buffered transaction that get started as soon as another future finishes.
pub(crate) buffered: VecDeque<B256>,
/// The provider that gets the transaction
pub(crate) provider: &'a dyn Provider<T>,
/// A stream of transaction hashes.
pub(crate) stream: St,
/// Marks if the stream is done
stream_done: bool,
/// max allowed futures to execute at once.
pub(crate) max_concurrent: usize,
}
impl<'a, T: Clone + Transport, St> TransactionStream<'a, T, St> {
/// Create a new `TransactionStream` instance
pub fn new(provider: &'a dyn Provider<T>, stream: St, max_concurrent: usize) -> Self {
Self {
pending: Default::default(),
buffered: Default::default(),
provider,
stream,
stream_done: false,
max_concurrent,
}
}
/// Push a future into the set
pub(crate) fn push_tx(&mut self, tx: B256) {
let fut = self
.provider
.root()
.raw_request::<_, Option<Transaction>>("eth_getTransactionByHash".into(), (tx,))
.then(move |res| match res {
Ok(Some(tx)) => futures::future::ok(tx),
Ok(None) => futures::future::err(GetTransactionError::NotFound(tx)),
Err(err) => futures::future::err(GetTransactionError::ProviderError(tx, err)),
});
self.pending.push(Box::pin(fut));
}
}
impl<'a, T, St> Stream for TransactionStream<'a, T, St>
where
T: Clone + Transport,
St: Stream<Item = B256> + Unpin + 'a,
{
type Item = TransactionResult;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// drain buffered transactions first
while this.pending.len() < this.max_concurrent {
if let Some(tx) = this.buffered.pop_front() {
this.push_tx(tx);
} else {
break;
}
}
if !this.stream_done {
loop {
match Stream::poll_next(Pin::new(&mut this.stream), cx) {
Poll::Ready(Some(tx)) => {
if this.pending.len() < this.max_concurrent {
this.push_tx(tx);
} else {
this.buffered.push_back(tx);
}
}
Poll::Ready(None) => {
this.stream_done = true;
break;
}
_ => break,
}
}
}
// poll running futures
if let tx @ Poll::Ready(Some(_)) = this.pending.poll_next_unpin(cx) {
return tx;
}
if this.stream_done && this.pending.is_empty() {
// all done
return Poll::Ready(None);
}
Poll::Pending
}
}