Skip to content

Commit

Permalink
add get_object (MystenLabs#10375)
Browse files Browse the repository at this point in the history
## Description 

get_object to loadgen

## Test Plan 

How did you test the new or updated feature?

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
wlmyng authored Apr 5, 2023
1 parent 7b6340d commit 73d30ba
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 28 deletions.
10 changes: 8 additions & 2 deletions crates/sui-rpc-loadgen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,20 @@ cargo run --bin sui-rpc-loadgen -- --urls "http://127.0.0.1:9000" "http://127.0.
cargo run --bin sui-rpc-loadgen -- --urls "http://127.0.0.1:9000" "http://127.0.0.1:9000" --num-threads 4 multi-get-objects
```

### Get Object
```bash
cargo run --bin sui-rpc-loadgen -- --urls "http://127.0.0.1:9000" "http://127.0.0.1:9000" --num-threads 2 get-object --chunk-size 20
```

### Get All Balances
```bash
cargo run --bin sui-rpc-loadgen -- --urls "http://127.0.0.1:9000" "http://127.0.0.1:9000" --num-threads 4 get-all-balances
cargo run --bin sui-rpc-loadgen -- --urls "http://127.0.0.1:9000" "http://127.0.0.1:9000" --num-threads 2 get-all-balances --chunk-size 20
```


### Get Reference Gas Price
```bash
cargo run --bin sui-rpc-loadgen -- --urls "http://127.0.0.1:9000" "http://127.0.0.1:9000" --num-threads 4 get-reference-gas-price --num-chunks-per-thread 10
cargo run --bin sui-rpc-loadgen -- --urls "http://127.0.0.1:9000" "http://127.0.0.1:9000" --num-threads 2 get-reference-gas-price --num-chunks-per-thread 10
```

# Useful commands
Expand Down
23 changes: 21 additions & 2 deletions crates/sui-rpc-loadgen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,19 @@ pub enum ClapCommand {
#[clap(flatten)]
common: CommonOptions,
},
#[clap(name = "get-object")]
GetObject {
#[clap(long)]
chunk_size: usize,

#[clap(flatten)]
common: CommonOptions,
},
#[clap(name = "get-all-balances")]
GetAllBalances {
#[clap(long)]
chunk_size: usize,

#[clap(flatten)]
common: CommonOptions,
},
Expand Down Expand Up @@ -203,9 +214,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
false,
)
}
ClapCommand::GetAllBalances { common } => {
ClapCommand::GetAllBalances { common, chunk_size } => {
let addresses = load_addresses_from_file(expand_path(&opts.data_directory));
(Command::new_get_all_balances(addresses), common, false)
(
Command::new_get_all_balances(addresses, chunk_size),
common,
false,
)
}
ClapCommand::MultiGetObjects { common } => {
let objects = load_objects_from_file(expand_path(&opts.data_directory));
Expand All @@ -219,6 +234,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
false,
)
}
ClapCommand::GetObject { common, chunk_size } => {
let objects = load_objects_from_file(expand_path(&opts.data_directory));
(Command::new_get_object(objects, chunk_size), common, false)
}
};

let signer_info = need_keystore.then_some(get_keypair()?);
Expand Down
31 changes: 16 additions & 15 deletions crates/sui-rpc-loadgen/src/payload/get_all_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use futures::future::join_all;
use sui_json_rpc_types::Balance;
use sui_sdk::SuiClient;
use sui_types::base_types::SuiAddress;
use tracing::log::warn;

use super::validation::chunk_entities;

#[async_trait]
impl<'a> ProcessPayload<'a, &'a GetAllBalances> for RpcCommandProcessor {
Expand All @@ -17,24 +18,24 @@ impl<'a> ProcessPayload<'a, &'a GetAllBalances> for RpcCommandProcessor {
op: &'a GetAllBalances,
_signer_info: &Option<SignerInfo>,
) -> Result<()> {
let clients = self.get_clients().await?;
if op.addresses.is_empty() {
warn!("No addresses provided, skipping query");
return Ok(());
panic!("No addresses provided, skipping query");
}

let mut tasks = Vec::new();
for address in &op.addresses {
for client in clients.iter() {
let owner_address = *address;
let task = async move {
get_all_balances(client, owner_address).await.unwrap();
};
tasks.push(task);
let clients = self.get_clients().await?;
let chunked = chunk_entities(&op.addresses, Some(op.chunk_size));
for chunk in chunked {
let mut tasks = Vec::new();
for address in chunk {
for client in clients.iter() {
let owner_address = address;
let task = async move {
get_all_balances(client, owner_address).await.unwrap();
};
tasks.push(task);
}
}
join_all(tasks).await;
}
join_all(tasks).await;

Ok(())
}
}
Expand Down
51 changes: 51 additions & 0 deletions crates/sui-rpc-loadgen/src/payload/get_object.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use futures::future::join_all;
use sui_json_rpc_types::{SuiObjectDataOptions, SuiObjectResponse};
use sui_sdk::SuiClient;
use sui_types::base_types::ObjectID;

use crate::payload::{GetObject, ProcessPayload, RpcCommandProcessor, SignerInfo};
use async_trait::async_trait;

use super::validation::chunk_entities;

#[async_trait]
impl<'a> ProcessPayload<'a, &'a GetObject> for RpcCommandProcessor {
async fn process(&'a self, op: &'a GetObject, _signer_info: &Option<SignerInfo>) -> Result<()> {
if op.object_ids.is_empty() {
panic!("No object ids provided, skipping query");
};
let clients = self.get_clients().await?;
let chunked = chunk_entities(&op.object_ids, Some(op.chunk_size));

for chunk in chunked {
let mut tasks = Vec::new();
for object_id in chunk {
for client in clients.iter() {
let task = async move {
get_object(client, object_id).await.unwrap();
};
tasks.push(task);
}
}
join_all(tasks).await;
}
Ok(())
}
}

// TODO: should organize these into an api_calls.rs
pub(crate) async fn get_object(
client: &SuiClient,
object_id: ObjectID,
) -> Result<SuiObjectResponse> {
let result = client
.read_api()
.get_object_with_options(object_id, SuiObjectDataOptions::full_content())
.await
.unwrap();
Ok(result)
}
27 changes: 25 additions & 2 deletions crates/sui-rpc-loadgen/src/payload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mod checkpoint_utils;
mod get_all_balances;
mod get_checkpoints;
mod get_object;
mod get_reference_gas_price;
mod multi_get_objects;
mod pay_sui;
Expand Down Expand Up @@ -116,8 +117,22 @@ impl Command {
}
}

pub fn new_get_all_balances(addresses: Vec<SuiAddress>) -> Self {
let get_all_balances = GetAllBalances { addresses };
pub fn new_get_object(object_ids: Vec<ObjectID>, chunk_size: usize) -> Self {
let get_object = GetObject {
object_ids,
chunk_size,
};
Self {
data: CommandData::GetObject(get_object),
..Default::default()
}
}

pub fn new_get_all_balances(addresses: Vec<SuiAddress>, chunk_size: usize) -> Self {
let get_all_balances = GetAllBalances {
addresses,
chunk_size,
};
Self {
data: CommandData::GetAllBalances(get_all_balances),
..Default::default()
Expand Down Expand Up @@ -151,6 +166,7 @@ pub enum CommandData {
PaySui(PaySui),
QueryTransactionBlocks(QueryTransactionBlocks),
MultiGetObjects(MultiGetObjects),
GetObject(GetObject),
GetAllBalances(GetAllBalances),
GetReferenceGasPrice(GetReferenceGasPrice),
}
Expand Down Expand Up @@ -203,9 +219,16 @@ pub struct MultiGetObjects {
pub object_ids: Vec<ObjectID>,
}

#[derive(Clone)]
pub struct GetObject {
pub object_ids: Vec<ObjectID>,
pub chunk_size: usize,
}

#[derive(Clone)]
pub struct GetAllBalances {
pub addresses: Vec<SuiAddress>,
pub chunk_size: usize,
}

#[derive(Clone)]
Expand Down
58 changes: 51 additions & 7 deletions crates/sui-rpc-loadgen/src/payload/rpc_command_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use sui_types::messages::{ExecuteTransactionRequestType, Transaction, Transactio
use crate::payload::checkpoint_utils::get_latest_checkpoint_stats;
use crate::payload::validation::chunk_entities;
use crate::payload::{
Command, CommandData, DryRun, GetAllBalances, GetCheckpoints, Payload, ProcessPayload,
Processor, QueryTransactionBlocks, SignerInfo,
Command, CommandData, DryRun, GetAllBalances, GetCheckpoints, GetObject, MultiGetObjects,
Payload, ProcessPayload, Processor, QueryTransactionBlocks, SignerInfo,
};

pub(crate) const DEFAULT_GAS_BUDGET: u64 = 500_000_000;
Expand Down Expand Up @@ -81,6 +81,7 @@ impl RpcCommandProcessor {
CommandData::PaySui(ref v) => self.process(v, signer_info).await,
CommandData::QueryTransactionBlocks(ref v) => self.process(v, signer_info).await,
CommandData::MultiGetObjects(ref v) => self.process(v, signer_info).await,
CommandData::GetObject(ref v) => self.process(v, signer_info).await,
CommandData::GetAllBalances(ref v) => self.process(v, signer_info).await,
CommandData::GetReferenceGasPrice(ref v) => self.process(v, signer_info).await,
}
Expand Down Expand Up @@ -271,6 +272,20 @@ impl Processor for RpcCommandProcessor {
divide_get_all_balances_tasks(data, config.num_threads).await
}
}
CommandData::MultiGetObjects(data) => {
if !config.divide_tasks {
vec![config.command.clone(); config.num_threads]
} else {
divide_multi_get_objects_tasks(data, config.num_threads).await
}
}
CommandData::GetObject(data) => {
if !config.divide_tasks {
vec![config.command.clone(); config.num_threads]
} else {
divide_get_object_tasks(data, config.num_threads).await
}
}
_ => vec![config.command.clone(); config.num_threads],
};

Expand Down Expand Up @@ -446,16 +461,45 @@ async fn divide_query_transaction_blocks_tasks(
.collect()
}

async fn divide_get_all_balances_tasks(data: &GetAllBalances, num_chunks: usize) -> Vec<Command> {
let chunk_size = if data.addresses.len() < num_chunks {
async fn divide_get_all_balances_tasks(data: &GetAllBalances, num_threads: usize) -> Vec<Command> {
let per_thread_size = if data.addresses.len() < num_threads {
1
} else {
data.addresses.len() as u64 / num_chunks as u64
data.addresses.len() / num_threads
};
let chunked = chunk_entities(data.addresses.as_slice(), Some(chunk_size as usize));

let chunked = chunk_entities(data.addresses.as_slice(), Some(per_thread_size));
chunked
.into_iter()
.map(|chunk| Command::new_get_all_balances(chunk, data.chunk_size))
.collect()
}

// TODO: probs can do generic divide tasks
async fn divide_multi_get_objects_tasks(data: &MultiGetObjects, num_chunks: usize) -> Vec<Command> {
let chunk_size = if data.object_ids.len() < num_chunks {
1
} else {
data.object_ids.len() as u64 / num_chunks as u64
};
let chunked = chunk_entities(data.object_ids.as_slice(), Some(chunk_size as usize));
chunked
.into_iter()
.map(Command::new_multi_get_objects)
.collect()
}

async fn divide_get_object_tasks(data: &GetObject, num_threads: usize) -> Vec<Command> {
let per_thread_size = if data.object_ids.len() < num_threads {
1
} else {
data.object_ids.len() / num_threads
};

let chunked = chunk_entities(data.object_ids.as_slice(), Some(per_thread_size));
chunked
.into_iter()
.map(Command::new_get_all_balances)
.map(|chunk| Command::new_get_object(chunk, data.chunk_size))
.collect()
}

Expand Down

0 comments on commit 73d30ba

Please sign in to comment.