Skip to content

Commit

Permalink
Daemons to indicate reason for termination (#183)
Browse files Browse the repository at this point in the history
* ensure graceful exit

* have daemon() return exit codes

* return invisibly
  • Loading branch information
shikokuchuo authored Jan 9, 2025
1 parent b926f1a commit 7eb8bb4
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 19 deletions.
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# mirai 2.0.0.9000 (development)

#### Updates

* `daemon()` now returns an integer exit value to indicate the reason for termination.

# mirai 2.0.0

#### New Architecture
Expand Down
45 changes: 33 additions & 12 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@
#' \sQuote{...} argument to \code{\link{daemons}} or
#' \code{\link{launch_local}} to provide redirection of output to the host
#' process (applicable only for local daemons).
#' @param maxtasks [default Inf] integer maximum number of tasks to execute
#' (task limit) before exiting.
#' @param idletime [default Inf] integer milliseconds maximum time to wait for a
#' task (idle time) before exiting.
#' @param walltime [default Inf] integer milliseconds soft walltime (time limit)
#' i.e. the minimum amount of real time elapsed before exiting.
#' @param maxtasks [default Inf] integer maximum number of tasks to execute
#' (task limit) before exiting.
#' @param id [default NULL] (optional) integer daemon ID provided to dispatcher
#' to track connection status. Causes \code{\link{status}} to report this ID
#' under \code{$events} when the daemon connects and disconnects.
Expand All @@ -73,7 +73,9 @@
#' automatically using L'Ecuyer-CMRG RNG streams generated by the host process
#' and should not be independently supplied.
#'
#' @return Invisible NULL.
#' @return Invisibly, an integer exit code: 0L for normal termination, and a
#' positive value if a self-imposed limit was reached: 1L (idletime), 2L
#' (walltime), 3L (maxtasks).
#'
#' @section Persistence:
#'
Expand All @@ -96,13 +98,13 @@
#' @export
#'
daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = TRUE,
cleanup = TRUE, output = FALSE, maxtasks = Inf, idletime = Inf,
walltime = Inf, id = NULL, tls = NULL, rs = NULL) {
cleanup = TRUE, output = FALSE, idletime = Inf, walltime = Inf,
maxtasks = Inf, id = NULL, tls = NULL, rs = NULL) {

missing(dispatcher) && return(
v1_daemon(url = url, asyncdial = asyncdial, autoexit = autoexit,
cleanup = cleanup, output = output, maxtasks = maxtasks,
idletime = idletime, walltime = walltime, ..., tls = tls, rs = rs)
cleanup = cleanup, output = output, idletime = idletime,
walltime = walltime, maxtasks = maxtasks, ..., tls = tls, rs = rs)
)

cv <- cv()
Expand All @@ -125,6 +127,7 @@ daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = T
}, add = TRUE)
}
snapshot()
xc <- 0L
task <- 1L
timeout <- if (idletime > walltime) walltime else if (is.finite(idletime)) idletime
maxtime <- if (is.finite(walltime)) mclock() + walltime else FALSE
Expand All @@ -133,7 +136,7 @@ daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = T
aio <- recv_aio(sock, mode = 1L, cv = cv)
if (is.numeric(id))
send(sock, c(.intmax, as.integer(id)), mode = 2L, block = TRUE)
wait(cv) || return()
wait(cv) || return(invisible(xc))
serial <- collect_aio(aio)
if (is.list(serial))
`opt<-`(sock, "serial", serial)
Expand All @@ -142,13 +145,23 @@ daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = T
wait(cv) || break
m <- collect_aio(aio)
is.integer(m) && {
m == 5L && break
m == 5L && {
xc <- 1L
break
}
next
}
cancel <- recv_aio(sock, mode = 8L, cv = NA)
data <- eval_mirai(m)
stop_aio(cancel)
{ task >= maxtasks || maxtime && mclock() >= maxtime } && .mark()
{ task >= maxtasks || maxtime && mclock() >= maxtime } && {
.mark()
send(sock, data, mode = 1L, block = TRUE)
aio <- recv_aio(sock, mode = 8L, cv = cv)
xc <- 2L + task >= maxtasks
wait(cv)
break
}
send(sock, data, mode = 1L, block = TRUE)
if (cleanup) do_cleanup()
task <- task + 1L
Expand All @@ -159,15 +172,23 @@ daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = T
aio <- recv_aio(ctx, mode = 1L, timeout = timeout, cv = cv)
wait(cv) || break
m <- collect_aio(aio)
is.integer(m) && break
is.integer(m) && {
xc <- 1L
break
}
data <- eval_mirai(m)
send(ctx, data, mode = 1L, block = TRUE)
if (cleanup) do_cleanup()
{ task >= maxtasks || maxtime && mclock() >= maxtime } && break
{ task >= maxtasks || maxtime && mclock() >= maxtime } && {
xc <- 2L + task >= maxtasks
break
}
task <- task + 1L
}
}

invisible(xc)

}

#' dot Daemon
Expand Down
3 changes: 2 additions & 1 deletion R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,11 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,
next
}
send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE)
send(psock, ._scm_., mode = 2L, pipe = outq[[id]][["pipe"]], block = TRUE)
send(psock, 0L, mode = 2L, pipe = outq[[id]][["pipe"]], block = TRUE)
if (length(outq[[id]][["dmnid"]]))
events <- c(events, outq[[id]][["dmnid"]])
outq[[id]] <- NULL
next
} else {
send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE)
outq[[id]][["msgid"]] <- 0L
Expand Down
12 changes: 7 additions & 5 deletions man/daemon.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ connection && Sys.getenv("NOT_CRAN") == "true" && {
res <- status()
test_zero(res$connections)
test_identical(res$events, c(125L, -125L))
test_equal(status()$mirai[["awaiting"]], 1L)
test_equal(res$mirai[["awaiting"]], 1L)
test_equal(launch_local(1, idletime = 5000L, walltime = 500L, id = 129L), 1L)
test_zero(m[])
Sys.sleep(1L)
Expand Down

0 comments on commit 7eb8bb4

Please sign in to comment.