This repository has been archived by the owner on Jun 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 120
/
Copy pathpending_bundle.rs
146 lines (129 loc) · 5.05 KB
/
pending_bundle.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use crate::bundle::BundleHash;
use ethers::core::types::{Block, TxHash, U64};
use ethers::providers::{
interval, JsonRpcClient, Middleware, Provider, ProviderError, DEFAULT_POLL_INTERVAL,
};
use futures_core::stream::Stream;
use futures_util::stream::StreamExt;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
/// A pending bundle is one that has been submitted to a relay,
/// but not yet included.
///
/// You can `await` the pending bundle. When the target block of the
/// bundle has been included in the chain the future will resolve,
/// either with the bundle hash indicating that the bundle was
/// included in the target block, or with an error indicating
/// that the bundle was not included in the target block.
///
/// To figure out why your bundle was not included, refer to the
/// [Flashbots documentation][fb_debug].
///
/// [fb_debug]: https://docs.flashbots.net/flashbots-auction/searchers/faq/#why-didnt-my-transaction-get-included
#[pin_project]
pub struct PendingBundle<'a, P> {
pub bundle_hash: Option<BundleHash>,
pub block: U64,
pub transactions: Vec<TxHash>,
provider: &'a Provider<P>,
state: PendingBundleState<'a>,
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
}
impl<'a, P: JsonRpcClient> PendingBundle<'a, P> {
pub fn new(
bundle_hash: Option<BundleHash>,
block: U64,
transactions: Vec<TxHash>,
provider: &'a Provider<P>,
) -> Self {
Self {
bundle_hash,
block,
transactions,
provider,
state: PendingBundleState::PausedGettingBlock,
interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
}
}
/// Get the bundle hash for this pending bundle.
#[deprecated(note = "use the bundle_hash field instead")]
pub fn bundle_hash(&self) -> Option<BundleHash> {
self.bundle_hash
}
}
impl<'a, P: JsonRpcClient> Future for PendingBundle<'a, P> {
type Output = Result<Option<BundleHash>, PendingBundleError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
match this.state {
PendingBundleState::PausedGettingBlock => {
futures_util::ready!(this.interval.poll_next_unpin(ctx));
let fut = Box::pin(this.provider.get_block(*this.block));
*this.state = PendingBundleState::GettingBlock(fut);
ctx.waker().wake_by_ref();
}
PendingBundleState::GettingBlock(fut) => {
let block_res = futures_util::ready!(fut.as_mut().poll(ctx));
// If the provider errors, we try again after some interval.
if block_res.is_err() {
*this.state = PendingBundleState::PausedGettingBlock;
ctx.waker().wake_by_ref();
return Poll::Pending;
}
let block_opt = block_res.unwrap();
// If the block doesn't exist yet, we try again after some interval.
if block_opt.is_none() {
*this.state = PendingBundleState::PausedGettingBlock;
ctx.waker().wake_by_ref();
return Poll::Pending;
}
let block = block_opt.unwrap();
// If the block is pending, we try again after some interval.
if block.number.is_none() {
*this.state = PendingBundleState::PausedGettingBlock;
ctx.waker().wake_by_ref();
return Poll::Pending;
}
// Check if all transactions of the bundle are present in the block
let included: bool = this
.transactions
.iter()
.all(|tx_hash| block.transactions.contains(tx_hash));
*this.state = PendingBundleState::Completed;
if included {
return Poll::Ready(Ok(*this.bundle_hash));
} else {
return Poll::Ready(Err(PendingBundleError::BundleNotIncluded));
}
}
PendingBundleState::Completed => {
panic!("polled pending bundle future after completion")
}
}
Poll::Pending
}
}
/// Errors for pending bundles.
#[derive(Error, Debug)]
pub enum PendingBundleError {
/// The bundle was not included in the target block.
#[error("Bundle was not included in target block")]
BundleNotIncluded,
/// An error occured while interacting with the RPC endpoint.
#[error(transparent)]
ProviderError(#[from] ProviderError),
}
type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
enum PendingBundleState<'a> {
/// Waiting for an interval before calling API again
PausedGettingBlock,
/// Polling the blockchain to get block information
GettingBlock(PinBoxFut<'a, Option<Block<TxHash>>>),
/// Future has completed
Completed,
}