Skip to content

Commit

Permalink
kble: kill after grace period
Browse files Browse the repository at this point in the history
  • Loading branch information
kobkaz committed Apr 30, 2024
1 parent 00a923c commit f884ac5
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 14 deletions.
22 changes: 13 additions & 9 deletions kble/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct Connections<'a> {
// Some: connections not used yet
// None: connections is used in a link
map: HashMap<&'a str, Connection>,
max_child_wait_secs: u64,
termination_grace_period_secs: u64,
}

struct Link<'a> {
Expand All @@ -28,8 +28,8 @@ struct Link<'a> {
dest: plug::PlugSink,
}

pub async fn run(config: &Config, max_child_wait_secs: u64) -> Result<()> {
let mut conns = connect_to_plugs(config, max_child_wait_secs).await?;
pub async fn run(config: &Config, termination_grace_period_secs: u64) -> Result<()> {
let mut conns = connect_to_plugs(config, termination_grace_period_secs).await?;
let links = connect_links(&mut conns, config);

let (quit_tx, _) = broadcast::channel(1);
Expand All @@ -53,10 +53,10 @@ pub async fn run(config: &Config, max_child_wait_secs: u64) -> Result<()> {
}

impl<'a> Connections<'a> {
fn new(max_child_wait_secs: u64) -> Self {
fn new(termination_grace_period_secs: u64) -> Self {
Self {
map: HashMap::new(),
max_child_wait_secs,
termination_grace_period_secs,
}
}

Expand Down Expand Up @@ -98,7 +98,7 @@ impl<'a> Connections<'a> {
// close all connections
// assume all links are returned
async fn close_and_wait(self) -> Result<()> {
let futs = self.map.into_iter().map(|(name, conn)| async move {
let futs = self.map.into_iter().map(|(name, mut conn)| async move {
let fut = async {
if let Some(mut s) = conn.sink {
debug!("Closing {name}");
Expand All @@ -111,7 +111,7 @@ impl<'a> Connections<'a> {
anyhow::Ok(())
};
let close_result = tokio::time::timeout(
std::time::Duration::from_secs(self.max_child_wait_secs),
std::time::Duration::from_secs(self.termination_grace_period_secs),
fut,
)
.await;
Expand All @@ -121,6 +121,7 @@ impl<'a> Connections<'a> {
Err(_) => {
// abandon the connection
warn!("Plug {name} didn't exit in time");
conn.backend.kill().await?;
Ok(())
}
}
Expand All @@ -138,8 +139,11 @@ impl<'a> Connections<'a> {
}
}

async fn connect_to_plugs(config: &Config, max_child_wait_secs: u64) -> Result<Connections> {
let mut conns = Connections::new(max_child_wait_secs);
async fn connect_to_plugs(
config: &Config,
termination_grace_period_secs: u64,
) -> Result<Connections> {
let mut conns = Connections::new(termination_grace_period_secs);
for (name, url) in config.plugs().iter() {
debug!("Connecting to {name}");
let connect_result = plug::connect(url).await.with_context(move || {
Expand Down
7 changes: 4 additions & 3 deletions kble/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ struct Args {
#[clap(long, short)]
spaghetti: PathBuf,

/// Maximum time to wait for a child process to exit after a closing handshake
/// Period to wait for each child process to exit after a closing handshake
/// before killing it
#[clap(long, default_value_t = 10)]
max_child_wait_secs: u64,
termination_grace_period_secs: u64,
}

impl Args {
Expand Down Expand Up @@ -49,6 +50,6 @@ async fn main() -> Result<()> {

let args = Args::parse_with_license_notice(include_notice!());
let config = args.load_spaghetti_config()?;
app::run(&config, args.max_child_wait_secs).await?;
app::run(&config, args.termination_grace_period_secs).await?;
Ok(())
}
11 changes: 9 additions & 2 deletions kble/src/plug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@ pub enum Backend {
}

impl Backend {
pub async fn wait(self) -> Result<()> {
pub async fn wait(&mut self) -> Result<()> {
match self {
Backend::WebSocketClient => Ok(()),
Backend::StdioProcess(mut proc) => {
Backend::StdioProcess(proc) => {
proc.wait()
.await
.with_context(|| format!("Failed to wait for {:?}", proc))?;
Ok(())
}
}
}

pub async fn kill(self) -> Result<()> {
match self {
Backend::WebSocketClient => Ok(()),
Backend::StdioProcess(mut proc) => proc.kill().await.map_err(Into::into),
}
}
}

pub async fn connect(url: &Url) -> Result<(Backend, PlugSink, PlugStream)> {
Expand Down

0 comments on commit f884ac5

Please sign in to comment.