Skip to content

Commit

Permalink
Document the pubsub client APIs. (solana-labs#27188)
Browse files Browse the repository at this point in the history
* Document the pubsub client APIs.

* Apply suggestions from code review

Co-authored-by: Tyera Eulberg <[email protected]>

* Update client/src/nonblocking/pubsub_client.rs

Co-authored-by: Tyera Eulberg <[email protected]>

* Update pubsub_client docs

* Link pubsub docs to solana-rpc docs

* fmt

* Update solana_client references

Co-authored-by: Tyera Eulberg <[email protected]>
Co-authored-by: Tyera Eulberg <[email protected]>
  • Loading branch information
3 people authored Sep 15, 2022
1 parent 1d5314b commit 0c1ff7c
Show file tree
Hide file tree
Showing 6 changed files with 492 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions docs/src/developing/clients/jsonrpc-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -4184,7 +4184,7 @@ Result:

### programSubscribe

Subscribe to a program to receive notifications when the lamports or data for a given account owned by the program changes
Subscribe to a program to receive notifications when the lamports or data for an account owned by the given program changes

#### Parameters:

Expand Down Expand Up @@ -4352,7 +4352,7 @@ Result:

### signatureSubscribe

Subscribe to a transaction signature to receive notification when the transaction is confirmed On `signatureNotification`, the subscription is automatically cancelled
Subscribe to a transaction signature to receive notification when a given transaction is committed. On `signatureNotification`, the subscription is automatically cancelled.

#### Parameters:

Expand Down
1 change: 1 addition & 0 deletions pubsub-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tungstenite = { version = "0.17.2", features = ["rustls-tls-webpki-roots"] }
url = "2.2.2"

[dev-dependencies]
anyhow = "1.0.58"

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
272 changes: 272 additions & 0 deletions pubsub-client/src/nonblocking/pubsub_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,171 @@
//! A client for subscribing to messages from the RPC server.
//!
//! The [`PubsubClient`] implements [Solana WebSocket event
//! subscriptions][spec].
//!
//! [spec]: https://docs.solana.com/developing/clients/jsonrpc-api#subscription-websocket
//!
//! This is a nonblocking (async) API. For a blocking API use the synchronous
//! client in [`crate::pubsub_client`].
//!
//! A single `PubsubClient` client may be used to subscribe to many events via
//! subscription methods like [`PubsubClient::account_subscribe`]. These methods
//! return a [`PubsubClientResult`] of a pair, the first element being a
//! [`BoxStream`] of subscription-specific [`RpcResponse`]s, the second being an
//! unsubscribe closure, an asynchronous function that can be called and
//! `await`ed to unsubscribe.
//!
//! Note that `BoxStream` contains an immutable reference to the `PubsubClient`
//! that created it. This makes `BoxStream` not `Send`, forcing it to stay in
//! the same task as its `PubsubClient`. `PubsubClient` though is `Send` and
//! `Sync`, and can be shared between tasks by putting it in an `Arc`. Thus
//! one viable pattern to creating multiple subscriptions is:
//!
//! - create an `Arc<PubsubClient>`
//! - spawn one task for each subscription, sharing the `PubsubClient`.
//! - in each task:
//! - create a subscription
//! - send the `UnsubscribeFn` to another task to handle shutdown
//! - loop while receiving messages from the subscription
//!
//! This pattern is illustrated in the example below.
//!
//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
//! disabled on RPC nodes. They can be enabled by passing
//! `--rpc-pubsub-enable-block-subscription` and
//! `--rpc-pubsub-enable-vote-subscription` to `solana-validator`. When these
//! methods are disabled, the RPC server will return a "Method not found" error
//! message.
//!
//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
//!
//! # Examples
//!
//! Demo two async `PubsubClient` subscriptions with clean shutdown.
//!
//! This spawns a task for each subscription type, each of which subscribes and
//! sends back a ready message and an unsubscribe channel (closure), then loops
//! on printing messages. The main task then waits for user input before
//! unsubscribing and waiting on the tasks.
//!
//! ```
//! use anyhow::Result;
//! use futures_util::StreamExt;
//! use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
//! use std::sync::Arc;
//! use tokio::io::AsyncReadExt;
//! use tokio::sync::mpsc::unbounded_channel;
//!
//! pub async fn watch_subscriptions(
//! websocket_url: &str,
//! ) -> Result<()> {
//!
//! // Subscription tasks will send a ready signal when they have subscribed.
//! let (ready_sender, mut ready_receiver) = unbounded_channel::<()>();
//!
//! // Channel to receive unsubscribe channels (actually closures).
//! // These receive a pair of `(Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>), &'static str)`,
//! // where the first is a closure to call to unsubscribe, the second is the subscription name.
//! let (unsubscribe_sender, mut unsubscribe_receiver) = unbounded_channel::<(_, &'static str)>();
//!
//! // The `PubsubClient` must be `Arc`ed to share it across tasks.
//! let pubsub_client = Arc::new(PubsubClient::new(websocket_url).await?);
//!
//! let mut join_handles = vec![];
//!
//! join_handles.push(("slot", tokio::spawn({
//! // Clone things we need before moving their clones into the `async move` block.
//! //
//! // The subscriptions have to be made from the tasks that will receive the subscription messages,
//! // because the subscription streams hold a reference to the `PubsubClient`.
//! // Otherwise we would just subscribe on the main task and send the receivers out to other tasks.
//!
//! let ready_sender = ready_sender.clone();
//! let unsubscribe_sender = unsubscribe_sender.clone();
//! let pubsub_client = Arc::clone(&pubsub_client);
//! async move {
//! let (mut slot_notifications, slot_unsubscribe) =
//! pubsub_client.slot_subscribe().await?;
//!
//! // With the subscription started,
//! // send a signal back to the main task for synchronization.
//! ready_sender.send(()).expect("channel");
//!
//! // Send the unsubscribe closure back to the main task.
//! unsubscribe_sender.send((slot_unsubscribe, "slot"))
//! .map_err(|e| format!("{}", e)).expect("channel");
//!
//! // Drop senders so that the channels can close.
//! // The main task will receive until channels are closed.
//! drop((ready_sender, unsubscribe_sender));
//!
//! // Do something with the subscribed messages.
//! // This loop will end once the main task unsubscribes.
//! while let Some(slot_info) = slot_notifications.next().await {
//! println!("------------------------------------------------------------");
//! println!("slot pubsub result: {:?}", slot_info);
//! }
//!
//! // This type hint is necessary to allow the `async move` block to use `?`.
//! Ok::<_, anyhow::Error>(())
//! }
//! })));
//!
//! join_handles.push(("root", tokio::spawn({
//! let ready_sender = ready_sender.clone();
//! let unsubscribe_sender = unsubscribe_sender.clone();
//! let pubsub_client = Arc::clone(&pubsub_client);
//! async move {
//! let (mut root_notifications, root_unsubscribe) =
//! pubsub_client.root_subscribe().await?;
//!
//! ready_sender.send(()).expect("channel");
//! unsubscribe_sender.send((root_unsubscribe, "root"))
//! .map_err(|e| format!("{}", e)).expect("channel");
//! drop((ready_sender, unsubscribe_sender));
//!
//! while let Some(root) = root_notifications.next().await {
//! println!("------------------------------------------------------------");
//! println!("root pubsub result: {:?}", root);
//! }
//!
//! Ok::<_, anyhow::Error>(())
//! }
//! })));
//!
//! // Drop these senders so that the channels can close
//! // and their receivers return `None` below.
//! drop(ready_sender);
//! drop(unsubscribe_sender);
//!
//! // Wait until all subscribers are ready before proceeding with application logic.
//! while let Some(_) = ready_receiver.recv().await { }
//!
//! // Do application logic here.
//!
//! // Wait for input or some application-specific shutdown condition.
//! tokio::io::stdin().read_u8().await?;
//!
//! // Unsubscribe from everything, which will shutdown all the tasks.
//! while let Some((unsubscribe, name)) = unsubscribe_receiver.recv().await {
//! println!("unsubscribing from {}", name);
//! unsubscribe().await
//! }
//!
//! // Wait for the tasks.
//! for (name, handle) in join_handles {
//! println!("waiting on task {}", name);
//! if let Ok(Err(e)) = handle.await {
//! println!("task {} failed: {}", name, e);
//! }
//! }
//!
//! Ok(())
//! }
//! # Ok::<(), anyhow::Error>(())
//! ```
use {
futures_util::{
future::{ready, BoxFuture, FutureExt},
Expand Down Expand Up @@ -85,6 +253,9 @@ type RequestMsg = (
oneshot::Sender<Result<Value, PubsubClientError>>,
);

/// A client for subscribing to messages from the RPC server.
///
/// See the [module documentation][self].
#[derive(Debug)]
pub struct PubsubClient {
subscribe_tx: mpsc::UnboundedSender<SubscribeRequestMsg>,
Expand Down Expand Up @@ -175,6 +346,15 @@ impl PubsubClient {
))
}

/// Subscribe to account events.
///
/// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
///
/// # RPC Reference
///
/// This method corresponds directly to the [`accountSubscribe`] RPC method.
///
/// [`accountSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#accountsubscribe
pub async fn account_subscribe(
&self,
pubkey: &Pubkey,
Expand All @@ -184,6 +364,18 @@ impl PubsubClient {
self.subscribe("account", params).await
}

/// Subscribe to block events.
///
/// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
///
/// This method is disabled by default. It can be enabled by passing
/// `--rpc-pubsub-enable-block-subscription` to `solana-validator`.
///
/// # RPC Reference
///
/// This method corresponds directly to the [`blockSubscribe`] RPC method.
///
/// [`blockSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#blocksubscribe---unstable-disabled-by-default
pub async fn block_subscribe(
&self,
filter: RpcBlockSubscribeFilter,
Expand All @@ -192,6 +384,15 @@ impl PubsubClient {
self.subscribe("block", json!([filter, config])).await
}

/// Subscribe to transaction log events.
///
/// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
///
/// # RPC Reference
///
/// This method corresponds directly to the [`logsSubscribe`] RPC method.
///
/// [`logsSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#logssubscribe
pub async fn logs_subscribe(
&self,
filter: RpcTransactionLogsFilter,
Expand All @@ -200,6 +401,16 @@ impl PubsubClient {
self.subscribe("logs", json!([filter, config])).await
}

/// Subscribe to program account events.
///
/// Receives messages of type [`RpcKeyedAccount`] when an account owned
/// by the given program changes.
///
/// # RPC Reference
///
/// This method corresponds directly to the [`programSubscribe`] RPC method.
///
/// [`programSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#programsubscribe
pub async fn program_subscribe(
&self,
pubkey: &Pubkey,
Expand All @@ -223,14 +434,52 @@ impl PubsubClient {
self.subscribe("program", params).await
}

/// Subscribe to vote events.
///
/// Receives messages of type [`RpcVote`] when a new vote is observed. These
/// votes are observed prior to confirmation and may never be confirmed.
///
/// This method is disabled by default. It can be enabled by passing
/// `--rpc-pubsub-enable-vote-subscription` to `solana-validator`.
///
/// # RPC Reference
///
/// This method corresponds directly to the [`voteSubscribe`] RPC method.
///
/// [`voteSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#votesubscribe---unstable-disabled-by-default
pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> {
self.subscribe("vote", json!([])).await
}

/// Subscribe to root events.
///
/// Receives messages of type [`Slot`] when a new [root] is set by the
/// validator.
///
/// [root]: https://docs.solana.com/terminology#root
///
/// # RPC Reference
///
/// This method corresponds directly to the [`rootSubscribe`] RPC method.
///
/// [`rootSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#rootsubscribe
pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> {
self.subscribe("root", json!([])).await
}

/// Subscribe to transaction confirmation events.
///
/// Receives messages of type [`RpcSignatureResult`] when a transaction
/// with the given signature is committed.
///
/// This is a subscription to a single notification. It is automatically
/// cancelled by the server once the notification is sent.
///
/// # RPC Reference
///
/// This method corresponds directly to the [`signatureSubscribe`] RPC method.
///
/// [`signatureSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#signaturesubscribe
pub async fn signature_subscribe(
&self,
signature: &Signature,
Expand All @@ -240,10 +489,33 @@ impl PubsubClient {
self.subscribe("signature", params).await
}

/// Subscribe to slot events.
///
/// Receives messages of type [`SlotInfo`] when a slot is processed.
///
/// # RPC Reference
///
/// This method corresponds directly to the [`slotSubscribe`] RPC method.
///
/// [`slotSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#slotsubscribe
pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> {
self.subscribe("slot", json!([])).await
}

/// Subscribe to slot update events.
///
/// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
///
/// Note that this method operates differently than other subscriptions:
/// instead of sending the message to a reciever on a channel, it accepts a
/// `handler` callback that processes the message directly. This processing
/// occurs on another thread.
///
/// # RPC Reference
///
/// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
///
/// [`slotUpdatesSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#slotsupdatessubscribe---unstable
pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> {
self.subscribe("slotsUpdates", json!([])).await
}
Expand Down
Loading

0 comments on commit 0c1ff7c

Please sign in to comment.