Skip to content

Commit

Permalink
cln-plugin: Defer binding the plugin state until after configuring
Browse files Browse the repository at this point in the history
We had a bit of a chicken-and-egg problem, where we instantiated the
`state` to be managed by the `Plugin` during the very first step when
creating the `Builder`, but then the state might depend on the
configuration we only get later. This would force developers to add
placeholders in the form of `Option` into the state, when really
they'd never be none after configuring.

This defers the binding until after we get the configuration and
cleans up the semantics:

 - `Builder`: declare options, hooks, etc
 - `ConfiguredPlugin`: we have exchanged the handshake with
   `lightningd`, now we can construct the `state` accordingly
 - `Plugin`: Running instance of the plugin

Changelog-Changed: cln-plugin: Moved the state binding to the plugin until after the configuration step
  • Loading branch information
cdecker committed Sep 25, 2022
1 parent 3f5ff0c commit 8898511
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 111 deletions.
3 changes: 2 additions & 1 deletion cln-grpc/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ fn test_keysend() {
"035d2b1192dfba134e10e540875d366ebc8bc353d5aa766b80c090b39c3a5d885d",
)
.unwrap(),
msatoshi: Some(Amount { msat: 10000 }),
amount_msat: Some(Amount { msat: 10000 }),

label: Some("hello".to_string()),
exemptfee: None,
maxdelay: None,
Expand Down
6 changes: 4 additions & 2 deletions plugins/examples/cln-plugin-startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use cln_plugin::{options, Builder, Error, Plugin};
use tokio;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
if let Some(plugin) = Builder::new((), tokio::io::stdin(), tokio::io::stdout())
let state = ();

if let Some(plugin) = Builder::new(tokio::io::stdin(), tokio::io::stdout())
.option(options::ConfigOption::new(
"test-option",
options::Value::Integer(42),
Expand All @@ -15,7 +17,7 @@ async fn main() -> Result<(), anyhow::Error> {
.rpcmethod("testmethod", "This is a test", testmethod)
.subscribe("connect", connect_handler)
.hook("peer_connected", peer_connected_handler)
.start()
.start(state)
.await?
{
plugin.join().await
Expand Down
16 changes: 8 additions & 8 deletions plugins/grpc-plugin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,7 @@ async fn main() -> Result<()> {
let directory = std::env::current_dir()?;
let (identity, ca_cert) = tls::init(&directory)?;

let state = PluginState {
rpc_path: path.into(),
identity,
ca_cert,
};

let plugin = match Builder::new(state.clone(), tokio::io::stdin(), tokio::io::stdout())
let plugin = match Builder::new(tokio::io::stdin(), tokio::io::stdout())
.option(options::ConfigOption::new(
"grpc-port",
options::Value::Integer(-1),
Expand All @@ -54,7 +48,13 @@ async fn main() -> Result<()> {
Some(o) => return Err(anyhow!("grpc-port is not a valid integer: {:?}", o)),
};

let plugin = plugin.start().await?;
let state = PluginState {
rpc_path: path.into(),
identity,
ca_cert,
};

let plugin = plugin.start(state.clone()).await?;

let bind_addr: SocketAddr = format!("0.0.0.0:{}", bind_port).parse().unwrap();

Expand Down
200 changes: 100 additions & 100 deletions plugins/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::codec::{JsonCodec, JsonRpcCodec};
pub use anyhow::{anyhow, Context};
use futures::sink::SinkExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
extern crate log;
use log::trace;
use messages::Configuration;
Expand All @@ -13,6 +14,7 @@ use tokio::sync::Mutex;
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tokio_util::codec::FramedWrite;
use options::ConfigOption;

pub mod codec;
pub mod logging;
Expand All @@ -23,7 +25,6 @@ extern crate serde_json;

pub mod options;

use options::ConfigOption;

/// Need to tell us about something that went wrong? Use this error
/// type to do that. Use this alias to be safe from future changes in
Expand All @@ -38,34 +39,81 @@ where
O: Send + AsyncWrite + Unpin,
S: Clone + Send,
{
state: S,

input: Option<I>,
output: Option<O>,

hooks: HashMap<String, Hook<S>>,
options: Vec<ConfigOption>,
configuration: Option<Configuration>,
rpcmethods: HashMap<String, RpcMethod<S>>,
subscriptions: HashMap<String, Subscription<S>>,
dynamic: bool,
}

/// A plugin that has registered with the lightning daemon, and gotten
/// its options filled, however has not yet acknowledged the `init`
/// message. This is a mid-state allowing a plugin to disable itself,
/// based on the options.
pub struct ConfiguredPlugin<S, I, O>
where
S: Clone + Send,
{
init_id: serde_json::Value,
input: FramedRead<I, JsonRpcCodec>,
output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
options: Vec<ConfigOption>,
configuration: Configuration,
rpcmethods: HashMap<String, AsyncCallback<S>>,
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
}

/// The [PluginDriver] is used to run the IO loop, reading messages
/// from the Lightning daemon, dispatching calls and notifications to
/// the plugin, and returning responses to the the daemon. We also use
/// it to handle spontaneous messages like Notifications and logging
/// events.
struct PluginDriver<S>
where
S: Send + Clone,
{
plugin: Plugin<S>,
rpcmethods: HashMap<String, AsyncCallback<S>>,

#[allow(dead_code)] // Unused until we fill in the Hook structs.
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
}

#[derive(Clone)]
pub struct Plugin<S>
where
S: Clone + Send,
{
/// The state gets cloned for each request
state: S,
/// "options" field of "init" message sent by cln
options: Vec<ConfigOption>,
/// "configuration" field of "init" message sent by cln
configuration: Configuration,
/// A signal that allows us to wait on the plugin's shutdown.
wait_handle: tokio::sync::broadcast::Sender<()>,

sender: tokio::sync::mpsc::Sender<serde_json::Value>,
}

impl<S, I, O> Builder<S, I, O>
where
O: Send + AsyncWrite + Unpin + 'static,
S: Clone + Sync + Send + Clone + 'static,
I: AsyncRead + Send + Unpin + 'static,
{
pub fn new(state: S, input: I, output: O) -> Self {
pub fn new(input: I, output: O) -> Self {
Self {
state,
input: Some(input),
output: Some(output),
hooks: HashMap::new(),
subscriptions: HashMap::new(),
options: vec![],
configuration: None,
rpcmethods: HashMap::new(),
dynamic: false,
}
Expand All @@ -91,7 +139,7 @@ where
/// Ok(())
/// }
///
/// let b = Builder::new((), tokio::io::stdin(), tokio::io::stdout())
/// let b = Builder::new(tokio::io::stdin(), tokio::io::stdout())
/// .subscribe("connect", connect_handler);
/// ```
pub fn subscribe<C, F>(mut self, topic: &str, callback: C) -> Builder<S, I, O>
Expand Down Expand Up @@ -195,10 +243,9 @@ where
))
}
};
let init_id = match input.next().await {
let (init_id, configuration) = match input.next().await {
Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => {
self.handle_init(m)?;
id
(id, self.handle_init(m)?)
}

Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
Expand All @@ -210,44 +257,27 @@ where
}
};

let (wait_handle, _) = tokio::sync::broadcast::channel(1);

// An MPSC pair used by anything that needs to send messages
// to the main daemon.
let (sender, receiver) = tokio::sync::mpsc::channel(4);
let plugin = Plugin {
state: self.state,
options: self.options,
configuration: self
.configuration
.ok_or(anyhow!("Plugin configuration missing"))?,
wait_handle,
sender,
};

// TODO Split the two hashmaps once we fill in the hook
// payload structs in messages.rs
let mut rpcmethods: HashMap<String, AsyncCallback<S>> =
HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback)));

let subscriptions =
HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));

// Leave the `init` reply pending, so we can disable based on
// the options if required.
Ok(Some(ConfiguredPlugin {
// The JSON-RPC `id` field so we can reply correctly.
init_id,
input,
output,
receiver,
driver: PluginDriver {
plugin: plugin.clone(),
rpcmethods,
hooks: HashMap::new(),
subscriptions: HashMap::from_iter(
self.subscriptions.drain().map(|(k, v)| (k, v.callback)),
),
},
plugin,
rpcmethods,
subscriptions,
options: self.options,
configuration,
hooks: HashMap::new(),
}))
}

Expand All @@ -261,9 +291,9 @@ where
/// `Plugin` instance and return `None` instead. This signals that
/// we should exit, and not continue running. `start()` returns in
/// order to allow user code to perform cleanup if necessary.
pub async fn start(self) -> Result<Option<Plugin<S>>, anyhow::Error> {
pub async fn start(self, state: S) -> Result<Option<Plugin<S>>, anyhow::Error> {
if let Some(cp) = self.configure().await? {
Ok(Some(cp.start().await?))
Ok(Some(cp.start(state).await?))
} else {
Ok(None)
}
Expand Down Expand Up @@ -292,7 +322,7 @@ where
}
}

fn handle_init(&mut self, call: messages::InitCall) -> Result<messages::InitResponse, Error> {
fn handle_init(&mut self, call: messages::InitCall) -> Result<Configuration, Error> {
use options::Value as OValue;
use serde_json::Value as JValue;

Expand All @@ -312,9 +342,7 @@ where
}
}

self.configuration = Some(call.configuration);

Ok(messages::InitResponse::default())
Ok(call.configuration)
}
}

Expand Down Expand Up @@ -356,39 +384,6 @@ where
callback: AsyncCallback<S>,
}

/// A plugin that has registered with the lightning daemon, and gotten
/// its options filled, however has not yet acknowledged the `init`
/// message. This is a mid-state allowing a plugin to disable itself,
/// based on the options.
pub struct ConfiguredPlugin<S, I, O>
where
S: Clone + Send,
{
init_id: serde_json::Value,
input: FramedRead<I, JsonRpcCodec>,
output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
plugin: Plugin<S>,
driver: PluginDriver<S>,
receiver: tokio::sync::mpsc::Receiver<serde_json::Value>,
}

#[derive(Clone)]
pub struct Plugin<S>
where
S: Clone + Send,
{
/// The state gets cloned for each request
state: S,
/// "options" field of "init" message sent by cln
options: Vec<ConfigOption>,
/// "configuration" field of "init" message sent by cln
configuration: Configuration,
/// A signal that allows us to wait on the plugin's shutdown.
wait_handle: tokio::sync::broadcast::Sender<()>,

sender: tokio::sync::mpsc::Sender<serde_json::Value>,
}

impl<S> Plugin<S>
where
S: Clone + Send,
Expand All @@ -409,12 +404,30 @@ where
O: Send + AsyncWrite + Unpin + 'static,
{
#[allow(unused_mut)]
pub async fn start(mut self) -> Result<Plugin<S>, anyhow::Error> {
let driver = self.driver;
let plugin = self.plugin;
pub async fn start(mut self, state: S) -> Result<Plugin<S>, anyhow::Error> {
let output = self.output;
let input = self.input;
let receiver = self.receiver; // Now reply to the `init` message that `configure` left pending.
let (wait_handle, _) = tokio::sync::broadcast::channel(1);

// An MPSC pair used by anything that needs to send messages
// to the main daemon.
let (sender, receiver) = tokio::sync::mpsc::channel(4);

let plugin = Plugin {
state,
options: self.options,
configuration: self.configuration,
wait_handle,
sender,
};

let driver = PluginDriver {
plugin: plugin.clone(),
rpcmethods: self.rpcmethods,
hooks: self.hooks,
subscriptions: self.subscriptions,
};

output
.lock()
.await
Expand Down Expand Up @@ -465,28 +478,14 @@ where
}

pub fn option(&self, name: &str) -> Option<options::Value> {
self.plugin.option(name)
self.options
.iter()
.filter(|o| o.name() == name)
.next()
.map(|co| co.value.clone().unwrap_or(co.default().clone()))
}
}

/// The [PluginDriver] is used to run the IO loop, reading messages
/// from the Lightning daemon, dispatching calls and notifications to
/// the plugin, and returning responses to the the daemon. We also use
/// it to handle spontaneous messages like Notifications and logging
/// events.
struct PluginDriver<S>
where
S: Send + Clone,
{
plugin: Plugin<S>,
rpcmethods: HashMap<String, AsyncCallback<S>>,

#[allow(dead_code)] // Unused until we fill in the Hook structs.
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
}

use tokio::io::{AsyncReadExt, AsyncWriteExt};
impl<S> PluginDriver<S>
where
S: Send + Clone,
Expand Down Expand Up @@ -688,7 +687,8 @@ mod test {

#[tokio::test]
async fn init() {
let builder = Builder::new((), tokio::io::stdin(), tokio::io::stdout());
let _ = builder.start();
let state = ();
let builder = Builder::new(tokio::io::stdin(), tokio::io::stdout());
let _ = builder.start(state);
}
}

0 comments on commit 8898511

Please sign in to comment.