Skip to content

Commit

Permalink
cln-plugin: Implement logging facade adapter for cln plugins
Browse files Browse the repository at this point in the history
We wrap emitted messages into a JSON-RPC notification envelope and
write them to stdout. We use an indirection over an mpsc channel in
order to avoid deadlocks if we emit logs while holding the writer lock
on stdout.
  • Loading branch information
cdecker authored and rustyrussell committed Mar 9, 2022
1 parent 9ae1f33 commit f5e1829
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cln-rpc/examples/getinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ async fn main() -> Result<(), anyhow::Error> {

let mut rpc = ClnRpc::new(p).await?;
let response = rpc.call(Request::Getinfo(GetinfoRequest {})).await?;
info!("{}", serde_json::to_string_pretty(&response)?);
println!("{}", serde_json::to_string_pretty(&response)?);
Ok(())
}
4 changes: 2 additions & 2 deletions plugins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ path = "examples/cln-plugin-startup.rs"
[dependencies]
anyhow = "1.0.51"
bytes = "1.1.0"
log = "0.4.14"
log = { version = "0.4.14", features = ['std'] }
serde = { version = "1.0.131", features = ["derive"] }
serde_json = "1.0.72"
tokio-util = { version = "0.6.9", features = ["codec"] }
tokio = { version="1", features = ['io-std', 'rt'] }
tokio = { version="1", features = ['io-std', 'rt', 'sync'] }
tokio-stream = "*"
futures = "0.3"
cln-rpc = { path = "../cln-rpc" }
Expand Down
40 changes: 23 additions & 17 deletions plugins/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::codec::{JsonCodec, JsonRpcCodec};
use futures::sink::SinkExt;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_util::codec::FramedWrite;
pub mod codec;
mod messages;
pub use anyhow::Error;
use futures::sink::SinkExt;
extern crate log;
use log::{trace, warn};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::Mutex;
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tokio_util::codec::FramedWrite;

pub mod codec;
pub mod logging;
mod messages;

#[macro_use]
extern crate serde_json;
Expand Down Expand Up @@ -50,19 +53,19 @@ where
}
}

pub async fn run(self) -> Result<(), Error> {
let (plugin, input) = self.build();
plugin.run(input).await
}

pub fn build(self) -> (Plugin<S, I, O>, I) {
let output = Arc::new(Mutex::new(FramedWrite::new(
self.output,
JsonCodec::default(),
)));

// Now configure the logging, so any `log` call is wrapped
// in a JSON-RPC notification and sent to c-lightning
tokio::spawn(async move {});
(
Plugin {
state: Arc::new(Mutex::new(self.state)),
output: Arc::new(Mutex::new(FramedWrite::new(
self.output,
JsonCodec::default(),
))),
output,
input_type: PhantomData,
},
self.input,
Expand All @@ -74,7 +77,7 @@ pub struct Plugin<S, I, O>
where
S: Clone + Send,
I: AsyncRead,
O: Send + AsyncWrite,
O: Send + AsyncWrite + 'static,
{
//input: FramedRead<Stdin, JsonCodec>,
output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
Expand All @@ -87,11 +90,14 @@ impl<S, I, O> Plugin<S, I, O>
where
S: Clone + Send,
I: AsyncRead + Send + Unpin,
O: Send + AsyncWrite + Unpin,
O: Send + AsyncWrite + Unpin + 'static,
{
/// Read incoming requests from `c-lightning and dispatch their handling.
#[allow(unused_mut)]
pub async fn run(mut self, input: I) -> Result<(), Error> {
crate::logging::init(self.output.clone()).await?;
trace!("Plugin logging initialized");

let mut input = FramedRead::new(input, JsonRpcCodec::default());
loop {
match input.next().await {
Expand Down
92 changes: 92 additions & 0 deletions plugins/src/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use crate::codec::JsonCodec;
use futures::SinkExt;
use log::{Level, Metadata, Record};
use serde::Serialize;
use std::sync::Arc;
use tokio::io::AsyncWrite;
use tokio::sync::Mutex;
use tokio_util::codec::FramedWrite;

#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "lowercase")]
struct LogEntry {
level: LogLevel,
message: String,
}

#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "lowercase")]
enum LogLevel {
Debug,
Info,
Warn,
Error,
}

impl From<log::Level> for LogLevel {
fn from(lvl: log::Level) -> Self {
match lvl {
log::Level::Error => LogLevel::Error,
log::Level::Warn => LogLevel::Warn,
log::Level::Info => LogLevel::Info,
log::Level::Debug | log::Level::Trace => LogLevel::Debug,
}
}
}

/// A simple logger that just wraps log entries in a JSON-RPC
/// notification and delivers it to `lightningd`.
struct PluginLogger {
// An unbounded mpsc channel we can use to talk to the
// flusher. This avoids having circular locking dependencies if we
// happen to emit a log record while holding the lock on the
// plugin connection.
sender: tokio::sync::mpsc::UnboundedSender<LogEntry>,
}

/// Initialize the logger starting a flusher to the passed in sink.
pub async fn init<O>(out: Arc<Mutex<FramedWrite<O, JsonCodec>>>) -> Result<(), log::SetLoggerError>
where
O: AsyncWrite + Send + Unpin + 'static,
{
let out = out.clone();
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<LogEntry>();
tokio::spawn(async move {
while let Some(i) = receiver.recv().await {
// We continue draining the queue, even if we get some
// errors when forwarding. Forwarding could break due to
// an interrupted connection or stdout being closed, but
// keeping the messages in the queue is a memory leak.
let _ = out
.lock()
.await
.send(json!({
"jsonrpc": "2.0",
"method": "log",
"params": i
}))
.await;
}
});
log::set_boxed_logger(Box::new(PluginLogger { sender }))
.map(|()| log::set_max_level(log::LevelFilter::Debug))
}

impl log::Log for PluginLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= Level::Debug
}

fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
self.sender
.send(LogEntry {
level: record.level().into(),
message: record.args().to_string(),
})
.unwrap();
}
}

fn flush(&self) {}
}

0 comments on commit f5e1829

Please sign in to comment.