Skip to content

Commit

Permalink
Merge pull request #1648 from rstudio/positron-repl_python
Browse files Browse the repository at this point in the history
Update `repl_python()` to use independent Console session in Positron
  • Loading branch information
t-kalinowski authored Sep 3, 2024
2 parents f287d15 + 5a3aa2d commit 27d6e73
Show file tree
Hide file tree
Showing 24 changed files with 627 additions and 144 deletions.
2 changes: 2 additions & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ __pycache__
^\.dockerignore$
^\.git-blame-ignore-revs$
^rchk$
^\.cache$
^compile_commands\.json$
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ README.html
index.html
inst/doc
.vscode/
rchk
rchk
.cache
compile_commands.json
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ LinkingTo: Rcpp
RoxygenNote: 7.3.2
Roxygen: list(markdown = TRUE)
VignetteBuilder: knitr
Config/build/compilation-database: true
13 changes: 12 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,18 @@

- Python background threads can now run in parallel with
the R session (#1641).


- In Positron, `repl_python()` will now use the Positron Python console. (#1648)

- `py_main_thread_func()` is deprecated; every R function can now safely be
called from background Python threads. (#1648)

- Calls from a Python thread into R will now notify the main thread using
R's native event loop, ensuring that these calls are handled even when
the main thread is engaged in non-Python tasks. (#1648)

- The Python session is now finalized when the R session exits (#1648).

- Internal updates for NumPy 2.1 (#1651)

- Fixed error when importing a module named `config` (#1628)
Expand Down
8 changes: 8 additions & 0 deletions R/package.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ is_python_initialized <- function() {
!is.null(.globals$py_config)
}

is_python_finalized <- function() {
identical(.globals$finalized, TRUE)
}

ensure_python_initialized <- function(required_module = NULL) {

# nothing to do if python is initialized
if (is_python_initialized())
return()

if (is_python_finalized())
stop("py_initialize() cannot be called more than once per R session or after py_finalize(). Please start a new R session.")

# notify front-end (if any) that Python is about to be initialized
callback <- getOption("reticulate.python.beforeInitialized")
if (is.function(callback))
Expand Down Expand Up @@ -218,6 +224,8 @@ initialize_python <- function(required_module = NULL, use_environment = NULL) {

)

reg.finalizer(.globals, function(e) py_finalize(), onexit = TRUE)

# set available flag indicating we have py bindings
config$available <- TRUE

Expand Down
1 change: 1 addition & 0 deletions R/python.R
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,7 @@ py_inject_hooks <- function() {
}

name <- if (is_python3()) "input" else "raw_input"
.globals$og_input_builtin <- builtins[[name]]
builtins[[name]] <- input
}

Expand Down
13 changes: 13 additions & 0 deletions R/repl.R
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ repl_python <- function(
on.exit(teardown(), add = TRUE)
}


ensure_python_initialized()
if (is.null(input) &&
Sys.getenv("POSITRON") == "1" &&
exists(".ps.reticulate_open", inherits = TRUE)) {

eval(call(".ps.reticulate_open"))

# TODO: seems we need to rerun py_inject_r(), possibly other init hooks.
# TODO: kernal initializion drops pre-existing objects in __main__
return(invisible())
}

# split provided code on newlines
if (!is.null(input))
input <- unlist(strsplit(input, "\n", fixed = TRUE))
Expand Down
55 changes: 52 additions & 3 deletions R/thread.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@


#' Create a Python function that will always be called on the main thread
#' [Deprecated] Create a Python function that will always be called on the main thread
#'
#' Beginning with reticulate v1.39.0, every R function is a "main thread func". Usage of `py_main_thread_func()`
#' is no longer necessary.
#'
#' This function is helpful when you need to provide a callback to a Python
#' library which may invoke the callback on a background thread. As R functions
Expand All @@ -13,7 +16,53 @@
#'
#'
#' @export
#' @keywords internal
py_main_thread_func <- function(f) {
tools <- import("rpytools")
tools$thread$main_thread_func(f)
r_to_py(f, TRUE) # every R func is a main thread func.
}


py_allow_threads <- function(allow = TRUE) {
if (allow) {
reticulate_ns <- environment(sys.function())
for (f in sys.frames()) {
if (identical(parent.env(f), reticulate_ns) &&
!identical(f, environment()))
# Can't release the gil as unlocked while we're holding it
# elsewhere on the callstack.
stop("Python threads can only be unblocked from a top-level reticulate call")
}
}

if (!was_python_initialized_by_reticulate())
stop("Can't safely unblock threads when R is running embedded")

invisible(py_allow_threads_impl(allow))
}



## TODO: document how to use sys.unraisablehook() to customize handling of exceptions
## from background threads. Or, switch to using the threading module, which
## has more options for customizing exceptions hooks, and document that.
## TODO: give a meaningful name for the thread that appears in tracebacks.
## Either use the threading module and pass name=,
## or do something like
## f = lambda file: run_file(file)
## f.__name__ = "run: " + os.path.basename(file)
py_run_file_on_thread <- function(file, ..., args = NULL) {
if (!is.null(args))
args <- as.list(as.character(args))
import("rpytools.run")$`_launch_lsp_server_on_thread`(file, args)
}

## used in Positron:
# reticulate:::py_run_file_on_thread(
# file = "${kernelPath}",
# args = c(
# "-f", "${connnectionFile}",
# "--logfile", "${logFile}",
# "--loglevel", "${logLevel}",
# "--session-mode", "console"
# )
# )
19 changes: 0 additions & 19 deletions R/threads.R

This file was deleted.

4 changes: 1 addition & 3 deletions R/zzz.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,5 @@


# .onUnload <- function(libpath) {
# ## Don't acquire GIL if we're embedded; https://github.com/rpy2/rpy2/issues/872
# # py_finalize()
# # py_clear_last_error()
# py_finalize() # called from reg.finalizer(.globals) instead.
# }
2 changes: 1 addition & 1 deletion inst/python/rpytools/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _tend_queue(self, min_fetch=0):
if threading.current_thread() is not threading.main_thread():
if not self._pending_tend_queue:
self._pending_tend_queue = True
rpycall.call_python_function_on_main_thread(
rpycall.schedule_python_function_on_main_thread(
self._tend_queue, min_fetch
)
return
Expand Down
41 changes: 26 additions & 15 deletions inst/python/rpytools/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,56 @@ def run_file(path):
with open(path, "r") as file:
file_content = file.read()

d = sys.modules["__main__"].__dict__
# ipykernel patches the loader, so that
# `import __main__` does not produce the "real" __main__, but rather,
# the facade that is the user facing __main__
# to get the "real" main, do:
# d = sys.modules["__main__"].__dict__
from __main__ import __dict__ as d

exec(file_content, d, d)


class RunMainScriptContext:
def __init__(self, path, args):
def __init__(self, path, argv=None):
self.path = path
self.args = tuple(args)
self.argv = tuple(argv) if argv is not None else None

def __enter__(self):
sys.path.insert(0, os.path.dirname(self.path))

self._orig_sys_argv = sys.argv
sys.argv = [self.path] + list(self.args)
if self.argv is not None:
self._orig_sys_argv = sys.argv
sys.argv = [self.path] + list(self.argv)

def __exit__(self, *_):
# try restore sys.path
try:
sys.path.remove(os.path.dirname(self.path))
except ValueError:
pass
# restore sys.argv if it's been unmodified
# otherwise, leave it as-is.
set_argv = [self.path] + list(self.args)
if sys.argv == set_argv:
sys.argv = self._orig_sys_argv
# try restore sys.argv if we patched it
if self.argv is not None:
# restore sys.argv if it's unmodified from what we set it to.
# otherwise, leave it as-is.
patched_argv = [self.path] + list(self.args)
if sys.argv == patched_argv:
sys.argv = self._orig_sys_argv


def _run_file_on_thread(path, args=None):
def _launch_lsp_server_on_thread(path, args):
# used by Positron reticulate launcher...
# TODO: update Positron to replace usage of this with `run_file_on_thread()`

import _thread
return run_file_on_thread(path, args)

_thread.start_new_thread(run_file, (path, ))


def _launch_lsp_server_on_thread(path, args):
def run_file_on_thread(path, args=None):
# for now, leave sys.argv and sys.path permanently modified.
# Later, revisit if it's desirable/safe to restore after the initial
# lsp event loop startup.
RunMainScriptContext(path, args).__enter__()
_run_file_on_thread(path)
import _thread

_thread.start_new_thread(run_file, (path,))
37 changes: 14 additions & 23 deletions inst/python/rpytools/thread.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,19 @@
import rpycall
import threading

import sys
import queue as queue
from rpycall import call_r_function, schedule_python_function_on_main_thread

is_py2 = sys.version[0] == "2"
if is_py2:
import Queue as queue
else:
import queue as queue


def main_thread_func(f):

def python_function(*args, **kwargs):

if isinstance(threading.current_thread(), threading._MainThread):
res = f(*args, **kwargs)
else:
result = queue.Queue()
rpycall.call_python_function_on_main_thread(
lambda: result.put(f(*args, **kwargs)), None
)
res = result.get()
def safe_call_r_function(f, args, kwargs):
try:
return call_r_function(f, *args, **kwargs), None
except Exception as e: # TODO: should we catch BaseException too? KeyboardInterrupt?
return None, e

return res

return python_function
def safe_call_r_function_on_main_thread(f, *args, **kwargs):
result = queue.Queue()
schedule_python_function_on_main_thread(
lambda: result.put(safe_call_r_function(f, args, kwargs)),
None,
)
return result.get()
7 changes: 6 additions & 1 deletion man/py_main_thread_func.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 24 additions & 6 deletions src/event_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ namespace {
volatile sig_atomic_t s_pollingRequested;
bool s_flush_std_buffers = true;

bool running = true;
tthread::thread* t = nullptr;

// Forward declarations
int pollForEvents(void*);

Expand All @@ -49,10 +52,7 @@ int pollForEvents(void*);
// the polling signal will not be set).
void eventPollingWorker(void *) {

while (true) {

// Throttle via sleep
tthread::this_thread::sleep_for(tthread::chrono::milliseconds(500));
while (running) {

// Schedule polling on the main thread if the interpeter is still running.
// Note that Py_AddPendingCall is documented to be callable from a background
Expand All @@ -65,6 +65,9 @@ void eventPollingWorker(void *) {
Py_AddPendingCall(pollForEvents, NULL);
}

// Throttle via sleep
tthread::this_thread::sleep_for(tthread::chrono::milliseconds(500));

}

}
Expand Down Expand Up @@ -138,9 +141,24 @@ int pollForEvents(void*) {

// Initialize event loop polling background thread
void initialize() {
tthread::thread t(eventPollingWorker, NULL);
t.detach();
running = true;
t = new tthread::thread(eventPollingWorker, NULL);
}

void deinitialize(bool wait) {
// signal the thread to stop
running = false;

// clear the ref
if (t) {
if (wait) { // default: false
t->join();
delete t; // Clean up the thread object
t = nullptr;
}
}
}


} // namespace event_loop
} // namespace reticulate
Loading

0 comments on commit 27d6e73

Please sign in to comment.