Skip to content

Commit

Permalink
use tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
cgwalters committed Jul 25, 2022
1 parent 1b41c5f commit d8997f3
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 55 deletions.
119 changes: 79 additions & 40 deletions rust/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,23 @@ use crate::cxxrsutil::*;
use crate::ffi::{
OverrideReplacementSource, OverrideReplacementType, ParsedRevision, ParsedRevisionKind,
};
use anyhow::{anyhow, format_err, Result, Context};
use anyhow::{anyhow, format_err, Context, Result};
use cap_std::fs::Dir;
use cap_std_ext::dirext::CapStdExtDirExt;
use cap_std_ext::{cap_std, rustix};
use fn_error_context::context;
use glib::prelude::*;
use once_cell::sync::Lazy;
use ostree_ext::{gio, glib, ostree};
use rustix::fd::{BorrowedFd, FromRawFd};
use rustix::fs::MetadataExt;
use std::collections::{BTreeMap, BTreeSet};
use std::io::{Read, Write};
use std::os::unix::fs::PermissionsExt;
use std::os::unix::net::{UnixStream, UnixListener};
use std::path::Path;
use std::sync::Mutex;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::oneshot::{Receiver, Sender};

const RPM_OSTREED_COMMIT_VERIFICATION_CACHE: &str = "rpm-ostree/gpgcheck-cache";

Expand Down Expand Up @@ -137,7 +140,8 @@ fn deployment_populate_variant_origin(
/// returned cleanly.
pub(crate) fn start_daemon_via_socket() -> CxxResult<()> {
let address = "/run/rpm-ostree/client.sock";
let s = UnixStream::connect(address)
tracing::debug!("Starting daemon via {address}");
let s = std::os::unix::net::UnixStream::connect(address)
.with_context(|| anyhow!("Failed to connect to {}", address))?;
let mut s = std::io::BufReader::new(s);
let mut r = String::new();
Expand All @@ -150,86 +154,121 @@ pub(crate) fn start_daemon_via_socket() -> CxxResult<()> {
}
}

fn send_init_result_to_client(client: &UnixStream, err: &Result<()>) {
let mut client = std::io::BufWriter::new(client);
match err {
Ok(_) => {
// On successwe close the stream without writing anything,
// which acknowledges successful startup to the client.
}
Err(e) => {
let msg = e.to_string();
match client
.write_all(msg.as_bytes())
.and_then(|_| client.flush())
{
Ok(_) => {}
Err(inner_err) => {
eprintln!(
"Failed to write error message to client socket (original error: {}): {}",
e, inner_err
);
async fn send_ok_result_to_client(_client: UnixStream) {
// On success we close the stream without writing anything,
// which acknowledges successful startup to the client.
// In the future we may actually implement a protocol here, so this
// is stubbed out as a full async fn in preparation for that.
tracing::debug!("Acknowleged client");
}

static SHUTDOWN_SIGNAL: Lazy<Mutex<Option<Sender<()>>>> = Lazy::new(|| Mutex::new(None));

async fn process_clients_with_ok(listener: UnixListener, mut cancel: Receiver<()>) {
tracing::debug!("Processing clients...");
loop {
tokio::select! {
_ = &mut cancel => {
tracing::debug!("Got cancellation event");
return
}
r = listener.accept() => {
match r {
Ok((stream, _addr)) => {
send_ok_result_to_client(stream).await;
},
Err(e) => {
tracing::debug!("failed to accept client: {e}")
}
}
}
}
}
}

fn process_clients(listener: UnixListener, res: &Result<()>) {
for stream in listener.incoming() {
match stream {
Ok(stream) => send_init_result_to_client(&stream, res),
Err(e) => {
// This shouldn't be fatal, we continue to start up.
eprintln!("Failed to listen for client stream: {}", e);
}
}
if res.is_err() {
break;
/// Ensure all asynchronous tasks in this Rust half of the daemon code are stopped.
/// Called from C++.
pub(crate) fn daemon_terminate() {
let chan = (*SHUTDOWN_SIGNAL).lock().unwrap().take().unwrap();
let _ = chan.send(());
}

fn process_one_client(listener: std::os::unix::net::UnixListener, e: anyhow::Error) -> Result<()> {
let mut incoming = match listener.incoming().next() {
Some(r) => r?,
None => {
anyhow::bail!("Expected to find client socket from activation");
}
}
};

let buf = format!("{e}");
incoming.write_all(buf.as_bytes())?;

todo!()
}

/// Perform initialization steps required by systemd service activation.
///
/// This ensures that the system is running under systemd, then receives the
/// socket-FD for main IPC logic, and notifies systemd about ready-state.
pub(crate) fn daemon_main(debug: bool) -> Result<()> {
let handle = tokio::runtime::Handle::current();
let _tokio_guard = handle.enter();
use std::os::unix::net::UnixListener as StdUnixListener;
if !systemd::daemon::booted()? {
return Err(anyhow!("not running as a systemd service"));
}

let init_res: Result<()> = crate::ffi::daemon_init_inner(debug).map_err(|e| e.into());
tracing::debug!("Initialization result: {init_res:?}");

let mut fds = systemd::daemon::listen_fds(false)?.iter();
let listener = match fds.next() {
None => {
// If started directly via `systemctl start` or DBus activation, we
// directly propagate the error back to our exit code.
init_res?;
UnixListener::bind("/run/rpmostreed.socket")?
tracing::debug!("Initializing directly (not socket activated)");
StdUnixListener::bind("/run/rpm-ostree/client.sock")?
}
Some(fd) => {
if fds.next().is_some() {
return Err(anyhow!("Expected exactly 1 fd from systemd activation"));
}
let listener = unsafe { UnixListener::from_raw_fd(fd) };
tracing::debug!("Initializing from socket activation; fd={fd}");
let listener = unsafe { StdUnixListener::from_raw_fd(fd) };

match init_res {
Ok(_) => listener,
Err(e) => {
let err_copy = Err(anyhow!("{}", e));
process_clients(listener, &err_copy);
let err_copy = anyhow!("{e}");
tracing::debug!("Reporting initialization error: {e}");
match process_one_client(listener, err_copy) {
Ok(()) => {
tracing::debug!("Acknowleged initial client");
}
Err(e) => {
tracing::debug!("Caught error while processing client {e}");
}
}
return Err(e);
}
}
}
};

// On success, we spawn a helper thread that just responds with
let (shutdown_send, shutdown_recv) = tokio::sync::oneshot::channel();
(*SHUTDOWN_SIGNAL).lock().unwrap().replace(shutdown_send);

let listener = UnixListener::from_std(listener)?;

// On success, we spawn a helper task that just responds with
// sucess to clients that connect via the socket. In the future,
// perhaps we'll expose an API here.
std::thread::spawn(move || process_clients(listener, &Ok(())));
tracing::debug!("Spawning acknowledgement task");
tokio::task::spawn(async { process_clients_with_ok(listener, shutdown_recv).await });

tracing::debug!("Entering daemon mainloop");
// And now, enter the main loop.
Ok(crate::ffi::daemon_main_inner()?)
}
Expand Down
1 change: 1 addition & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ pub mod ffi {
extern "Rust" {
fn daemon_main(debug: bool) -> Result<()>;
fn start_daemon_via_socket() -> Result<()>;
fn daemon_terminate();
fn daemon_sanitycheck_environment(sysroot: &OstreeSysroot) -> Result<()>;
fn deployment_generate_id(deployment: &OstreeDeployment) -> String;
fn deployment_populate_variant(
Expand Down
4 changes: 3 additions & 1 deletion rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ fn inner_main() -> Result<i32> {
.enable_all()
.build()
.context("Failed to build tokio runtime")?;
runtime.block_on(dispatch_multicall(callname, args))
let r = runtime.block_on(dispatch_multicall(callname, args));
tracing::debug!("Exiting inner main with result: {r:?}");
r
}

fn print_error(e: anyhow::Error) {
Expand Down
2 changes: 1 addition & 1 deletion src/app/libmain.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ rpmostree_option_context_parse (GOptionContext *context, const GOptionEntry *mai
return rpmostreecxx::client_throw_non_ostree_host_error (error);
}

rpmostreecxx::start_daemon_via_socket();
CXX_TRY (rpmostreecxx::start_daemon_via_socket (), error);

/* root never needs to auth */
if (getuid () != 0)
Expand Down
24 changes: 11 additions & 13 deletions src/app/rpmostree-builtin-start-daemon.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
#include "rpmostree-builtins.h"
#include "rpmostree-libbuiltin.h"
#include "rpmostree-util.h"
#include "rpmostreed-utils.h"
#include "rpmostreed-daemon.h"
#include "rpmostreed-utils.h"

typedef enum
{
Expand Down Expand Up @@ -213,13 +213,14 @@ on_log_handler (const gchar *log_domain, GLogLevelFlags log_level, const gchar *
sd_journal_print (priority, "%s", message);
}

namespace rpmostreecxx {
namespace rpmostreecxx
{
// This function is always called from the Rust side. Hopefully
// soon we'll move more of this code into daemon.rs.
void
daemon_init_inner (bool debug)
daemon_init_inner (bool debug)
{
g_autoptr(GError) local_error = NULL;
g_autoptr (GError) local_error = NULL;
if (debug)
{
g_autoptr (GIOChannel) channel = NULL;
Expand All @@ -244,17 +245,17 @@ daemon_init_inner (bool debug)
/* Get an explicit ref to the bus so we can use it later */
g_autoptr (GDBusConnection) bus = g_bus_get_sync (G_BUS_TYPE_SYSTEM, NULL, &local_error);
if (!bus)
util::throw_gerror(local_error);
util::throw_gerror (local_error);
if (!start_daemon (bus, &local_error))
{
sd_notifyf (0, "STATUS=error: %s", local_error->message);
util::throw_gerror(local_error);
util::throw_gerror (local_error);
}
}

// Called from rust side to enter mainloop.
void
daemon_main_inner ()
daemon_main_inner ()
{
state_transition (APPSTATE_RUNNING);

Expand Down Expand Up @@ -293,13 +294,10 @@ daemon_main_inner ()
} /* namespace */

gboolean
rpmostree_builtin_start_daemon (int argc,
char **argv,
RpmOstreeCommandInvocation *invocation,
GCancellable *cancellable,
GError **error)
rpmostree_builtin_start_daemon (int argc, char **argv, RpmOstreeCommandInvocation *invocation,
GCancellable *cancellable, GError **error)
{
g_autoptr(GOptionContext) opt_context = g_option_context_new (" - start the daemon process");
g_autoptr (GOptionContext) opt_context = g_option_context_new (" - start the daemon process");
g_option_context_add_main_entries (opt_context, opt_entries, NULL);

if (!g_option_context_parse (opt_context, &argc, &argv, error))
Expand Down
1 change: 1 addition & 0 deletions src/daemon/rpmostreed-daemon.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ rpmostreed_daemon_run_until_idle_exit (RpmostreedDaemon *self)
update_status (self);
while (self->running)
g_main_context_iteration (NULL, TRUE);
rpmostreecxx::daemon_terminate ();
}

void
Expand Down

0 comments on commit d8997f3

Please sign in to comment.