Skip to content

Commit

Permalink
feat(plugin): add async io for the plugin
Browse files Browse the repository at this point in the history
Adding the support of the async io for reading to the
std io.

Link: #98
Signed-off-by: Vincenzo Palazzo <[email protected]>
  • Loading branch information
vincenzopalazzo committed Mar 24, 2024
1 parent 56b385f commit 0fa3096
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 26 deletions.
3 changes: 2 additions & 1 deletion plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ keywords = [ "plugin", "cln", "rpc", "lightning", "bitcoin" ]
readme = "README.md"

[dependencies]
clightningrpc-common = { version = "0.3.0-beta.4" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
clightningrpc-common = { version = "0.3.0-beta.4" }
log = { version = "0.4.17", optional = true }
mio = { version = "0.8.10", features = ["os-ext"] }

[features]
log = ["dep:log"]
77 changes: 77 additions & 0 deletions plugin/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! async io module of the plugin io.
//!
//! Vincenzo Palazzo <[email protected]>
use std::io;
use std::io::{Read, Write};
use std::os::fd::AsRawFd;

const IN: mio::Token = mio::Token(0);

pub(crate) struct AsyncIO {
poll: mio::Poll,
}

impl AsyncIO {
/// Create a new instance of an AsyncIO
pub fn new() -> io::Result<Self> {
Ok(Self {
poll: mio::Poll::new()?,
})
}

pub fn register(&mut self) -> io::Result<()> {
let stdin = std::io::stdin().as_raw_fd();
let mut stdin = mio::unix::SourceFd(&stdin);

self.poll.registry().register(
&mut stdin,
IN,
mio::Interest::READABLE | mio::Interest::WRITABLE,
)?;
Ok(())
}

pub fn into_loop<F: FnMut(String) -> Option<String>>(
&mut self,
mut async_callback: F,
) -> io::Result<()> {
let mut events = mio::Events::with_capacity(1024);
loop {
self.poll.poll(&mut events, None)?;
for event in events.iter() {
#[cfg(feature = "log")]
log::info!("getting the event: {:?}", event);
match event.token() {
IN => {
if event.is_readable() {
let mut reader = io::stdin().lock();
let mut buffer = String::new();
loop {
let mut byte = [0; 1];
reader.read_exact(&mut byte).unwrap();

// Append the byte to the buffer
buffer.push(byte[0] as char);

// Check if the buffer ends with double newline
if buffer.ends_with("\n\n") {
drop(reader);
break; // Exit the loop
}
}
let Some(resp) = async_callback(buffer.clone()) else {
continue;
};
let mut writer = io::stdout().lock();
writer.write_all(resp.as_bytes())?;
writer.flush()?;
}
}
_ => unreachable!(),
}
#[cfg(feature = "log")]
log::info!("event handled: {:?}", event);
}
}
}
}
1 change: 1 addition & 0 deletions plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#![crate_name = "clightningrpc_plugin"]
pub mod commands;
pub mod errors;
mod io;
pub mod macros;
pub mod plugin;
pub mod types;
51 changes: 26 additions & 25 deletions plugin/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ use std::io::Write;
use std::string::String;
use std::sync::Arc;

use serde_json::Value;

use clightningrpc_common::json_utils::{add_str, init_payload, init_success_response};
use clightningrpc_common::types::Request;
use serde_json::Value;

use crate::commands::builtin::{InitRPC, ManifestRPC};
use crate::commands::types::{CLNConf, RPCHookInfo, RPCMethodInfo};
use crate::commands::RPCCommand;
use crate::errors::PluginError;
use crate::io::AsyncIO;
use crate::types::{LogLevel, RpcOption};

#[cfg(feature = "log")]
Expand Down Expand Up @@ -64,10 +66,10 @@ impl log::Log for Log {

fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
let mut writer = io::stdout().lock();
let level: LogLevel = record.level().into();
let msg = record.args();

let mut writer = io::stdout();
let mut payload = init_payload();
add_str(&mut payload, "level", &level.to_string());
add_str(&mut payload, "message", &format!("{msg}"));
Expand All @@ -77,10 +79,8 @@ impl log::Log for Log {
method: "log".to_owned(),
params: payload,
};
writer
.write_all(serde_json::to_string(&request).unwrap().as_bytes())
.unwrap();
writer.flush().unwrap();
let _ = writer.write_all(serde_json::to_string(&request).unwrap().as_bytes());
let _ = writer.flush();
}
}

Expand Down Expand Up @@ -112,7 +112,7 @@ impl<'a, T: 'a + Clone> Plugin<T> {
}

pub fn log(&self, level: LogLevel, msg: &str) {
let mut writer = io::stdout();
let mut writer = io::stdout().lock();
let mut payload = init_payload();
add_str(&mut payload, "level", &level.to_string());
add_str(&mut payload, "message", msg);
Expand Down Expand Up @@ -256,10 +256,7 @@ impl<'a, T: 'a + Clone> Plugin<T> {
}
}

pub fn start(mut self) {
let reader = io::stdin();
let mut writer = io::stdout();
let mut buffer = String::new();
pub fn start(mut self) -> io::Result<()> {
#[cfg(feature = "log")]
{
use std::str::FromStr;
Expand All @@ -276,29 +273,33 @@ impl<'a, T: 'a + Clone> Plugin<T> {
on_init: self.on_init.clone(),
}),
);
// FIXME: core lightning end with the double endline, so this can cause
// problem for some input reader.
// we need to parse the writer, and avoid this while loop
while let Ok(_) = reader.read_line(&mut buffer) {
let req_str = buffer.to_string();
buffer.clear();
let Ok(request) = serde_json::from_str::<Request<serde_json::Value>>(&req_str) else {
continue;
};
let mut asyncio = AsyncIO::new()?;
asyncio.register()?;
asyncio.into_loop(|buffer| {
#[cfg(feature = "log")]
log::info!("looping around the string: {buffer}");
let request: Request<serde_json::Value> = serde_json::from_str(&buffer).unwrap();
if let Some(id) = request.id {
// when the id is specified this is a RPC or Hook, so we need to return a response
let response = self.call_rpc_method(&request.method, request.params);
let mut rpc_response = init_success_response(id);
self.write_respose(&response, &mut rpc_response);
writer
.write_all(serde_json::to_string(&rpc_response).unwrap().as_bytes())
.unwrap();
writer.flush().unwrap();
#[cfg(feature = "log")]
log::info!(
"rpc or hook: {} with reponse {:?}",
request.method,
rpc_response
);
return Some(serde_json::to_string(&rpc_response).unwrap());
} else {
// in case of the id is None, we are receiving the notification, so the server is not
// interested in the answer.
self.handle_notification(&request.method, request.params);
#[cfg(feature = "log")]
log::info!("notification: {}", request.method);
return None;
}
}
})?;
Ok(())
}
}

0 comments on commit 0fa3096

Please sign in to comment.