Skip to content

Commit

Permalink
tpu-client: Move send_messages_with_spinner from program (solana-la…
Browse files Browse the repository at this point in the history
…bs#20960)

We have too many ways of sending transactions, and too many
reimplementations of the same logic all over the place.

The program deploy logic and stake-o-matic currently make the
most use of the TPU client, so this merges their implementations into
one place to be reused by both.  Yay for consolidation!
  • Loading branch information
joncinque authored Oct 26, 2021
1 parent 4d6190a commit 5f7b605
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 173 deletions.
173 changes: 15 additions & 158 deletions cli/src/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ use solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig};
use solana_bpf_loader_program::{syscalls::register_syscalls, BpfError, ThisInstructionMeter};
use solana_clap_utils::{self, input_parsers::*, input_validators::*, keypair::*};
use solana_cli_output::{
display::new_spinner_progress_bar, CliProgram, CliProgramAccountType, CliProgramAuthority,
CliProgramBuffer, CliProgramId, CliUpgradeableBuffer, CliUpgradeableBuffers,
CliUpgradeableProgram, CliUpgradeableProgramClosed, CliUpgradeablePrograms,
CliProgram, CliProgramAccountType, CliProgramAuthority, CliProgramBuffer, CliProgramId,
CliUpgradeableBuffer, CliUpgradeableBuffers, CliUpgradeableProgram,
CliUpgradeableProgramClosed, CliUpgradeablePrograms,
};
use solana_client::{
client_error::ClientErrorKind,
rpc_client::RpcClient,
rpc_config::RpcSendTransactionConfig,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
tpu_client::{TpuClient, TpuClientConfig},
};
use solana_rbpf::{
Expand All @@ -44,24 +43,18 @@ use solana_sdk::{
process_instruction::MockInvokeContext,
pubkey::Pubkey,
signature::{keypair_from_seed, read_keypair_file, Keypair, Signature, Signer},
signers::Signers,
system_instruction::{self, SystemError},
system_program,
transaction::Transaction,
transaction::TransactionError,
};
use solana_transaction_status::TransactionConfirmationStatus;
use std::{
collections::HashMap,
error,
fs::File,
io::{Read, Write},
mem::size_of,
path::PathBuf,
str::FromStr,
sync::Arc,
thread::sleep,
time::Duration,
};

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -2121,16 +2114,20 @@ fn send_deploy_messages(
if let Some(write_messages) = write_messages {
if let Some(write_signer) = write_signer {
trace!("Writing program data");
let transaction_errors = send_and_confirm_messages_with_spinner(
let tpu_client = TpuClient::new(
rpc_client.clone(),
&config.websocket_url,
write_messages,
&[payer_signer, write_signer],
)
.map_err(|err| format!("Data writes to account failed: {}", err))?
.into_iter()
.flatten()
.collect::<Vec<_>>();
TpuClientConfig::default(),
)?;
let transaction_errors = tpu_client
.send_and_confirm_messages_with_spinner(
write_messages,
&[payer_signer, write_signer],
)
.map_err(|err| format!("Data writes to account failed: {}", err))?
.into_iter()
.flatten()
.collect::<Vec<_>>();

if !transaction_errors.is_empty() {
for transaction_error in &transaction_errors {
Expand Down Expand Up @@ -2200,146 +2197,6 @@ fn report_ephemeral_mnemonic(words: usize, mnemonic: bip39::Mnemonic) {
);
}

fn send_and_confirm_messages_with_spinner<T: Signers>(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
messages: &[Message],
signers: &T,
) -> Result<Vec<Option<TransactionError>>, Box<dyn error::Error>> {
let commitment = rpc_client.commitment();

let progress_bar = new_spinner_progress_bar();
let send_transaction_interval = Duration::from_millis(10); /* ~100 TPS */
let mut send_retries = 5;

let (blockhash, mut last_valid_block_height) =
rpc_client.get_latest_blockhash_with_commitment(commitment)?;

let mut transactions = vec![];
let mut transaction_errors = vec![None; messages.len()];
for (i, message) in messages.iter().enumerate() {
let mut transaction = Transaction::new_unsigned(message.clone());
transaction.try_sign(signers, blockhash)?;
transactions.push((i, transaction));
}

progress_bar.set_message("Finding leader nodes...");
let tpu_client = TpuClient::new(
rpc_client.clone(),
websocket_url,
TpuClientConfig::default(),
)?;
loop {
// Send all transactions
let mut pending_transactions = HashMap::new();
let num_transactions = transactions.len();
for (i, transaction) in transactions {
if !tpu_client.send_transaction(&transaction) {
let _result = rpc_client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
preflight_commitment: Some(commitment.commitment),
..RpcSendTransactionConfig::default()
},
)
.ok();
}
pending_transactions.insert(transaction.signatures[0], (i, transaction));
progress_bar.set_message(format!(
"[{}/{}] Transactions sent",
pending_transactions.len(),
num_transactions
));

sleep(send_transaction_interval);
}

// Collect statuses for all the transactions, drop those that are confirmed
loop {
let mut block_height = 0;
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
for pending_signatures_chunk in
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
{
if let Ok(result) = rpc_client.get_signature_statuses(pending_signatures_chunk) {
let statuses = result.value;
for (signature, status) in
pending_signatures_chunk.iter().zip(statuses.into_iter())
{
if let Some(status) = status {
if let Some(confirmation_status) = &status.confirmation_status {
if *confirmation_status != TransactionConfirmationStatus::Processed
{
if let Some((i, _)) = pending_transactions.remove(signature) {
transaction_errors[i] = status.err;
}
}
} else if status.confirmations.is_none()
|| status.confirmations.unwrap() > 1
{
if let Some((i, _)) = pending_transactions.remove(signature) {
transaction_errors[i] = status.err;
}
}
}
}
}

block_height = rpc_client.get_block_height()?;
progress_bar.set_message(format!(
"[{}/{}] Transactions confirmed. Retrying in {} blocks",
num_transactions - pending_transactions.len(),
num_transactions,
last_valid_block_height.saturating_sub(block_height)
));
}

if pending_transactions.is_empty() {
return Ok(transaction_errors);
}

if block_height > last_valid_block_height {
break;
}

for (_i, transaction) in pending_transactions.values() {
if !tpu_client.send_transaction(transaction) {
let _result = rpc_client
.send_transaction_with_config(
transaction,
RpcSendTransactionConfig {
preflight_commitment: Some(commitment.commitment),
..RpcSendTransactionConfig::default()
},
)
.ok();
}
}

if cfg!(not(test)) {
// Retry twice a second
sleep(Duration::from_millis(500));
}
}

if send_retries == 0 {
return Err("Transactions failed".into());
}
send_retries -= 1;

// Re-sign any failed transactions with a new blockhash and retry
let (blockhash, new_last_valid_block_height) =
rpc_client.get_latest_blockhash_with_commitment(commitment)?;
last_valid_block_height = new_last_valid_block_height;
transactions = vec![];
for (_, (i, mut transaction)) in pending_transactions.into_iter() {
transaction.try_sign(signers, blockhash)?;
transactions.push((i, transaction));
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod rpc_filter;
pub mod rpc_request;
pub mod rpc_response;
pub mod rpc_sender;
pub mod spinner;
pub mod thin_client;
pub mod tpu_client;
pub mod transaction_executor;
2 changes: 1 addition & 1 deletion client/src/mock_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl RpcSender for MockSender {
context: RpcResponseContext { slot: 1 },
value: RpcBlockhash {
blockhash: PUBKEY.to_string(),
last_valid_block_height: 0,
last_valid_block_height: 1234,
},
})?,
"getFeeForMessage" => serde_json::to_value(Response {
Expand Down
12 changes: 2 additions & 10 deletions client/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use {
rpc_request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter},
rpc_response::*,
rpc_sender::*,
spinner,
},
bincode::serialize,
indicatif::{ProgressBar, ProgressStyle},
log::*,
serde_json::{json, Value},
solana_account_decoder::{
Expand Down Expand Up @@ -1062,7 +1062,7 @@ impl RpcClient {
};
let mut confirmations = 0;

let progress_bar = new_spinner_progress_bar();
let progress_bar = spinner::new_progress_bar();

progress_bar.set_message(format!(
"[{}/{}] Finalizing transaction {}",
Expand Down Expand Up @@ -4876,14 +4876,6 @@ pub struct GetConfirmedSignaturesForAddress2Config {
pub commitment: Option<CommitmentConfig>,
}

fn new_spinner_progress_bar() -> ProgressBar {
let progress_bar = ProgressBar::new(42);
progress_bar
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
progress_bar.enable_steady_tick(100);
progress_bar
}

fn get_rpc_request_str(rpc_addr: SocketAddr, tls: bool) -> String {
if tls {
format!("https://{}", rpc_addr)
Expand Down
11 changes: 11 additions & 0 deletions client/src/spinner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! Spinner creator
use indicatif::{ProgressBar, ProgressStyle};

pub(crate) fn new_progress_bar() -> ProgressBar {
let progress_bar = ProgressBar::new(42);
progress_bar
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
progress_bar.enable_steady_tick(100);
progress_bar
}
Loading

0 comments on commit 5f7b605

Please sign in to comment.