Skip to content

Commit

Permalink
Merge pull request ProvableHQ#1590 from AleoHQ/pool_endpoints
Browse files Browse the repository at this point in the history
Add extra RPC endpoints for operator
  • Loading branch information
howardwu authored Jan 27, 2022
2 parents 361599a + 0ee7c4c commit 67d539a
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 12 deletions.
7 changes: 7 additions & 0 deletions src/network/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ impl<N: Network, E: Environment> Operator<N, E> {
self.state.get_shares_for_prover(&self.ledger_reader, prover)
}

///
/// Returns a list of all provers which have submitted shares to this operator.
///
pub fn get_provers(&self) -> Vec<Address<N>> {
self.state.get_provers()
}

///
/// Performs the given `request` to the operator.
/// All requests must go through this `update`, so that a unified view is preserved.
Expand Down
13 changes: 12 additions & 1 deletion src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,16 @@ impl<N: Network, E: Environment> Server<N, E> {
// Initialize a new instance of the heartbeat.
Self::initialize_heartbeat(peers.router(), ledger.reader(), ledger.router(), operator.router(), prover.router()).await;
// Initialize a new instance of the RPC server.
Self::initialize_rpc(node, address, &peers, ledger.reader(), prover.router(), prover.memory_pool()).await;
Self::initialize_rpc(
node,
address,
&peers,
ledger.reader(),
operator.clone(),
prover.router(),
prover.memory_pool(),
)
.await;
// Initialize a new instance of the notification.
Self::initialize_notification(ledger.reader(), prover.clone(), address).await;

Expand Down Expand Up @@ -334,6 +343,7 @@ impl<N: Network, E: Environment> Server<N, E> {
address: Option<Address<N>>,
peers: &Arc<Peers<N, E>>,
ledger_reader: LedgerReader<N>,
operator: Arc<Operator<N, E>>,
prover_router: ProverRouter<N>,
memory_pool: Arc<RwLock<MemoryPool<N>>>,
) {
Expand All @@ -347,6 +357,7 @@ impl<N: Network, E: Environment> Server<N, E> {
address,
peers,
ledger_reader,
operator,
prover_router,
memory_pool,
)
Expand Down
26 changes: 26 additions & 0 deletions src/rpc/documentation/public_endpoints/getprovers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Get Provers
Returns the Aleo addresses of all provers which have submitted shares to an operator.

### Arguments

None

### Response

| Parameter | Type | Description |
|:---------:|:------:|:---------------------------------------------------------------------:|
| `result` | array | All of the Aleo addresses which have submitted shares to the operator |

### Example Request
```ignore
curl --data-binary '{"jsonrpc": "2.0", "id":"1", "method": "getprovers", "params": [] }' -H 'content-type: application/json' http://127.0.0.1:3030/
```

### Example Response
```json
{
"jsonrpc":"2.0",
"result": ["aleo1...", "aleo1..."],
"id":"1"
}
```
26 changes: 26 additions & 0 deletions src/rpc/documentation/public_endpoints/getshares.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Get Shares
Returns the total number of shares submitted to an operator.

### Arguments

None

### Response

| Parameter | Type | Description |
|:---------:|:------:|:----------------------------------------------------:|
| `result` | u64 | The total number of shares submitted to the operator |

### Example Request
```ignore
curl --data-binary '{"jsonrpc": "2.0", "id":"1", "method": "getshares", "params": [] }' -H 'content-type: application/json' http://127.0.0.1:3030/
```

### Example Response
```json
{
"jsonrpc":"2.0",
"result":"46239",
"id":"1"
}
```
28 changes: 28 additions & 0 deletions src/rpc/documentation/public_endpoints/getsharesforprover.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Get Shares For Prover
Returns the number of shares submitted by a prover, given their address.

### Arguments

| Parameter | Type | Required | Description |
|:---------:|:------:|:--------:|:------------------------------:|
| `prover` | string | Yes | The Aleo address of the prover |

### Response

| Parameter | Type | Description |
|:---------:|:------:|:--------------------------------------------:|
| `result` | u64 | The number of shares submitted by the prover |

### Example Request
```ignore
curl --data-binary '{"jsonrpc": "2.0", "id":"1", "method": "getsharesforprover", "params": ["aleo_address"] }' -H 'content-type: application/json' http://127.0.0.1:3030/
```

### Example Response
```json
{
"jsonrpc":"2.0",
"result":"581",
"id":"1"
}
```
69 changes: 59 additions & 10 deletions src/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Logic for instantiating the RPC server.
use crate::{
network::Operator,
rpc::{rpc_impl::RpcImpl, rpc_trait::RpcFunctions},
Environment,
LedgerReader,
Expand Down Expand Up @@ -54,7 +55,7 @@ pub struct Meta {

impl Metadata for Meta {}

const METHODS_EXPECTING_PARAMS: [&str; 12] = [
const METHODS_EXPECTING_PARAMS: [&str; 13] = [
// public
"getblock",
"getblocks",
Expand All @@ -68,6 +69,7 @@ const METHODS_EXPECTING_PARAMS: [&str; 12] = [
"gettransaction",
"gettransition",
"sendtransaction",
"getsharesforprover",
// // private
// "createtransaction",
// "getrawrecord",
Expand All @@ -85,11 +87,12 @@ pub async fn initialize_rpc_server<N: Network, E: Environment>(
address: Option<Address<N>>,
peers: &Arc<Peers<N, E>>,
ledger: LedgerReader<N>,
operator: Arc<Operator<N, E>>,
prover_router: ProverRouter<N>,
memory_pool: Arc<RwLock<MemoryPool<N>>>,
) -> tokio::task::JoinHandle<()> {
let credentials = RpcCredentials { username, password };
let rpc = RpcImpl::new(credentials, address, peers.clone(), ledger, prover_router, memory_pool);
let rpc = RpcImpl::new(credentials, address, peers.clone(), ledger, operator, prover_router, memory_pool);

let service = make_service_fn(move |conn: &AddrStream| {
let caller = conn.remote_addr();
Expand Down Expand Up @@ -366,6 +369,18 @@ async fn handle_rpc<N: Network, E: Environment>(
// .map_err(convert_core_err);
// result_to_response(&req, result)
// }
"getsharesforprover" => {
let result = rpc.get_shares_for_prover(params.remove(0)).await.map_err(convert_crate_err);
result_to_response(&req, result)
}
"getshares" => {
let result = rpc.get_shares().await;
result_to_response(&req, Ok(result))
}
"getprovers" => {
let result = rpc.get_provers().await;
result_to_response(&req, Ok(result))
}
_ => {
let err = jrt::Error::from_code(jrt::ErrorCode::MethodNotFound);
jrt::Response::error(jrt::Version::V2, err, req.id.clone())
Expand Down Expand Up @@ -466,9 +481,9 @@ mod tests {
};

// Derive the storage paths.
let (ledger_path, prover_path) = match &path {
Some(p) => (p.as_ref().to_path_buf(), temp_dir()),
None => (temp_dir(), temp_dir()),
let (ledger_path, prover_path, operator_storage_path) = match &path {
Some(p) => (p.as_ref().to_path_buf(), temp_dir(), temp_dir()),
None => (temp_dir(), temp_dir(), temp_dir()),
};

// Initialize the node.
Expand All @@ -483,7 +498,6 @@ mod tests {
let ledger = Ledger::<N, E>::open::<S, _>(&ledger_path, peers.router())
.await
.expect("Failed to initialize ledger");

// Initialize a new instance for managing the prover.
let prover = Prover::open::<S, _>(
&prover_path,
Expand All @@ -496,16 +510,37 @@ mod tests {
)
.await
.expect("Failed to initialize prover");
// Initialize a new instance for managing the operator.
let operator = Operator::open::<RocksDB, _>(
&operator_storage_path,
None,
local_ip,
prover.memory_pool(),
peers.router(),
ledger.reader(),
ledger.router(),
prover.router(),
)
.await
.expect("Failed to initialize operator");

RpcImpl::<N, E>::new(credentials, None, peers, ledger.reader(), prover.router(), prover.memory_pool())
RpcImpl::<N, E>::new(
credentials,
None,
peers,
ledger.reader(),
operator,
prover.router(),
prover.memory_pool(),
)
}

/// Initializes a new instance of the rpc.
async fn new_rpc_server<N: Network, E: Environment, S: Storage, P: AsRef<Path>>(path: Option<P>) {
// Derive the storage paths.
let (ledger_path, prover_path) = match &path {
Some(p) => (p.as_ref().to_path_buf(), temp_dir()),
None => (temp_dir(), temp_dir()),
let (ledger_path, prover_path, operator_storage_path) = match &path {
Some(p) => (p.as_ref().to_path_buf(), temp_dir(), temp_dir()),
None => (temp_dir(), temp_dir(), temp_dir()),
};

// Initialize the node.
Expand Down Expand Up @@ -533,6 +568,19 @@ mod tests {
)
.await
.expect("Failed to initialize prover");
// Initialize a new instance for managing the operator.
let operator = Operator::open::<RocksDB, _>(
&operator_storage_path,
None,
local_ip,
prover.memory_pool(),
peers.router(),
ledger.reader(),
ledger.router(),
prover.router(),
)
.await
.expect("Failed to initialize operator");

E::tasks().append(
initialize_rpc_server(
Expand All @@ -542,6 +590,7 @@ mod tests {
None,
&peers,
ledger.reader(),
operator,
prover.router(),
prover.memory_pool(),
)
Expand Down
22 changes: 22 additions & 0 deletions src/rpc/rpc_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! See [RpcFunctions](../trait.RpcFunctions.html) for documentation of public endpoints.
use crate::{
network::Operator,
rpc::{rpc::*, rpc_trait::RpcFunctions},
Environment,
LedgerReader,
Expand Down Expand Up @@ -66,6 +67,7 @@ pub struct RpcInner<N: Network, E: Environment> {
address: Option<Address<N>>,
peers: Arc<Peers<N, E>>,
ledger: LedgerReader<N>,
operator: Arc<Operator<N, E>>,
prover_router: ProverRouter<N>,
memory_pool: Arc<RwLock<MemoryPool<N>>>,
/// RPC credentials for accessing guarded endpoints
Expand Down Expand Up @@ -93,13 +95,15 @@ impl<N: Network, E: Environment> RpcImpl<N, E> {
address: Option<Address<N>>,
peers: Arc<Peers<N, E>>,
ledger: LedgerReader<N>,
operator: Arc<Operator<N, E>>,
prover_router: ProverRouter<N>,
memory_pool: Arc<RwLock<MemoryPool<N>>>,
) -> Self {
Self(Arc::new(RpcInner {
address,
peers,
ledger,
operator,
prover_router,
memory_pool,
credentials,
Expand Down Expand Up @@ -337,4 +341,22 @@ impl<N: Network, E: Environment> RpcFunctions<N> for RpcImpl<N, E> {
}
Ok(transaction.transaction_id())
}

/// Returns the amount of shares submitted by a given prover.
async fn get_shares_for_prover(&self, prover: Value) -> Result<u64, RpcError> {
let prover: Address<N> = serde_json::from_value(prover)?;
Ok(self.operator.get_shares_for_prover(&prover))
}

/// Returns the amount of shares submitted to the operator in total.
async fn get_shares(&self) -> u64 {
let shares = self.operator.to_shares();
shares.iter().map(|(_, share)| share.values().sum::<u64>()).sum()
}

/// Returns a list of all provers that have submitted shares to the operator.
async fn get_provers(&self) -> Value {
let provers = self.operator.get_provers();
serde_json::json!(provers)
}
}
9 changes: 9 additions & 0 deletions src/rpc/rpc_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ pub trait RpcFunctions<N: Network> {

#[doc = include_str!("./documentation/public_endpoints/sendtransaction.md")]
async fn send_transaction(&self, transaction_bytes: String) -> Result<N::TransactionID, RpcError>;

#[doc = include_str!("./documentation/public_endpoints/getsharesforprover.md")]
async fn get_shares_for_prover(&self, prover: serde_json::Value) -> Result<u64, RpcError>;

#[doc = include_str!("./documentation/public_endpoints/getshares.md")]
async fn get_shares(&self) -> u64;

#[doc = include_str!("./documentation/public_endpoints/getprovers.md")]
async fn get_provers(&self) -> serde_json::Value;
}

// /// Definition of private RPC endpoints that require authentication.
Expand Down
21 changes: 20 additions & 1 deletion storage/src/state/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ use crate::{
use snarkvm::dpc::prelude::*;

use anyhow::{anyhow, Result};
use std::{collections::HashMap, path::Path, sync::Arc};
use std::{
collections::{HashMap, HashSet},
iter::FromIterator,
path::Path,
sync::Arc,
};

#[derive(Debug)]
pub struct OperatorState<N: Network> {
Expand Down Expand Up @@ -76,6 +81,11 @@ impl<N: Network> OperatorState<N> {
pub fn remove_shares(&self, block_height: u32, coinbase_record: Record<N>) -> Result<()> {
self.shares.remove_shares(block_height, coinbase_record)
}

/// Returns a list of provers which have submitted shares to an operator.
pub fn get_provers(&self) -> Vec<Address<N>> {
self.shares.get_provers()
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -148,4 +158,13 @@ impl<N: Network> SharesState<N> {
fn remove_shares(&self, block_height: u32, coinbase_record: Record<N>) -> Result<()> {
self.shares.remove(&(block_height, coinbase_record))
}

fn get_provers(&self) -> Vec<Address<N>> {
let set: HashSet<Address<N>> = self
.shares
.iter()
.flat_map(|((_, _), shares)| shares.keys().copied().collect::<Vec<_>>())
.collect();
Vec::from_iter(set)
}
}

0 comments on commit 67d539a

Please sign in to comment.