diff --git a/build/Jamfile b/build/Jamfile index dbc9fb2036..bfb6c5539b 100644 --- a/build/Jamfile +++ b/build/Jamfile @@ -68,6 +68,7 @@ lib boost_python import.cpp exec.cpp object/function_doc_signature.cpp + eventloop.cpp : # requirements static:BOOST_PYTHON_STATIC_LIB BOOST_PYTHON_SOURCE diff --git a/doc/reference/eventloop.qbk b/doc/reference/eventloop.qbk new file mode 100644 index 0000000000..42f1ba5145 --- /dev/null +++ b/doc/reference/eventloop.qbk @@ -0,0 +1,95 @@ +[chapter Event Loop + [quickbook 1.7] +] + +[section boost/python/eventloop.hpp] + +[section Introduction] +Provide a Boost.Asio-based implementation for the Python [@https://docs.python.org/3/library/asyncio-eventloop.html `EventLoop`] type. Every callback is scheduled in strand. +[endsect] + +[section Function `set_default_event_loop`] +`` +void set_default_event_loop(const boost::asio::io_context::strand& strand); +`` +[variablelist +[[Effect][construct an `event_loop` object using provided [@https://www.boost.org/doc/libs/1_76_0/doc/html/boost_asio/overview/core/strands.html `strand`] object. Setup a new [@https://docs.python.org/3/library/asyncio-policy.html event loop policy], when user call `get_event_loop` using that policy, it returns the Boost Asio `event_loop` object]] +[[Throws][nothing]] +] +[endsect] + +[section Function `PyInit_boost_event_loop`] +`` +extern "C" +{ + PyObject* PyInit_boost_event_loop(); +} +`` +[variablelist +[[Effect][user must call `PyImport_AppendInittab("boost_event_loop", &PyInit_boost_event_loop);` before [@https://docs.python.org/3/c-api/init.html#c.Py_Initialize `Py_Initialize`]]] +[[Throws][nothing]] +] +[endsect] + +[section Example] +`` +// example.cpp +io_context ctx; +io_context::strand st{ctx}; + +PyImport_AppendInittab("boost_event_loop", &PyInit_boost_event_loop); +Py_Initialize(); +set_default_event_loop(st); + +object py_module = import("example.py"); +py_module.attr("hello_world")(); +st.context().run(); + +// example.py +import asyncio +def hello_world(): + print("hello world") + +def call_event_loop(): + loop = asyncio.get_event_loop_policy().get_event_loop() + loop.call_soon(hello_world) +`` +Note: `set_default_event_loop` must be called before any Python module is imported. Otherwise it may result in the module-level variables registered against the default asyncio eventloop instead the boost asio eventloop. Here is an example demonstrating the issue. +`` +// bad_import.cpp +Py_Initialize(); +import("example_module"); // example_module is initialized +set_default_event_loop(st); // boost_asio_eventloop is set to default, but the example_module.lock was registered against the old eventloop + +// example_module.py +import asyncio +lock = asyncio.Lock() +`` +[endsect] + +[section Event Loop and Multiple Python Sub-interpreters] +It's allowed to have multiple Python sub-interpreter instances in a same program. Each interpreter will act as a guest VM, and C++ host will schedule all the asynchronous events committed by the Python VM.[br] +The Python interpreter must outlive the [@https://www.boost.org/doc/libs/1_76_0/doc/html/boost_asio/reference/io_service.html `asio::io_context`] objects it owns. It's not safe to destroy the interpreter midways.[br] +[endsect] + +[section [@https://docs.python.org/3/c-api/init.html#thread-state-and-the-global-interpreter-lock `GIL`]] +`boost::asio::io_context::run()` may be called from multiple threads and the completion handlers may wakeup on different threads as well. The GIL must be released after setting up the Python IO object and before the call to `boost::asio::io_context::run()`. In the completion handler, the GIL must be reacquired and release before it calls into Python to deliver the result. +`` +// gil.cpp +py_func(); // call func imported from gil.py +PyEval_ReleaseLock(); // release GIL +ctx.run(); + +// gil.py +import asyncio + +def hello(): + print("hello_world") + +def func(): + loop = asyncio.get_event_loop_policy().get_event_loop() + loop.call_soon(hello) +`` +[endsect] + +[endsect] \ No newline at end of file diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp new file mode 100644 index 0000000000..6fadbe409b --- /dev/null +++ b/include/boost/python/eventloop.hpp @@ -0,0 +1,179 @@ +// Copyright Pan Yue 2021. +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// TODO: +// 1. posix::stream_descriptor need windows version +// 2. call_* need return async.Handle +# ifndef EVENT_LOOP_PY2021_HPP +# define EVENT_LOOP_PY2021_HPP + +#include +#include +#include +#include + +namespace boost { namespace python { namespace asio { + +class event_loop +{ +public: + event_loop(const boost::asio::io_context::strand& strand): _strand{strand} + { + try + { + _pymod_ssl = import("ssl"); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_ImportError)) + { + PyErr_Clear(); + } + } + } + + // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. + inline void call_soon(object f) + { + _strand.post([f]{ + PyEval_AcquireLock(); + f(); + PyEval_ReleaseLock(); + }); + } + + // TODO: implement this + inline void call_soon_thread_safe(object f) {}; + + // Schedule callback to be called after the given delay number of seconds + // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. + void call_later(double delay, object f); + + void call_at(double when, object f); + + inline double time() + { + return std::chrono::steady_clock::now().time_since_epoch().count(); + } + + inline void add_reader(int fd, object f) + { + _async_wait_fd(fd, f, _read_key(fd)); + } + + inline void remove_reader(int fd) + { + _descriptor_map.erase(_read_key(fd)); + } + + inline void add_writer(int fd, object f) + { + _async_wait_fd(fd, f, _write_key(fd)); + } + + inline void remove_writer(int fd) + { + _descriptor_map.erase(_write_key(fd)); + } + + + object sock_recv(object sock, size_t nbytes); + + object sock_recv_into(object sock, object buffer); + + object sock_sendall(object sock, object data); + + object sock_connect(object sock, object address); + + object sock_accept(object sock); + + object sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true); + + object start_tls(object transport, object protocol, object sslcontext, + bool server_side = false, + object server_hostname = object(), + object ssl_handshake_timeout = object()); + + object getaddrinfo(object host, int port, int family = 0, int type = 0, int proto = 0, int flags = 0); + + object getnameinfo(object sockaddr, int flags = 0); + + void set_exception_handler(object handler) + { + if (handler != object() && !PyObject_HasAttrString(handler.ptr(), "__call__")) { + PyErr_SetString(PyExc_TypeError, "A callable object or None is expected"); + throw_error_already_set(); + } + _exception_handler = handler; + } + + object get_exception_handler() + { + return _exception_handler; + } + + void default_exception_handler(object context); + + void call_exception_handler(object context); + + object create_future(); + + // TODO + inline bool get_debug() + { + return false; + } + +private: + object _pymod_ssl = object(); + object _pymod_socket = import("socket"); + object _pymod_traceback = import("traceback"); + object _pymod_asyncio_futures = import("asyncio").attr("futures"); + object _py_logger = import("asyncio.log").attr("logger"); + object _pymod_concurrent_futures = import("concurrent").attr("futures"); + object _exception_handler = object(); + boost::asio::io_context::strand _strand; + // read: key = fd * 2 + 0, write: key = fd * 2 + 1 + std::unordered_map> _descriptor_map; + + inline int _read_key(int fd) + { + return fd * 2; + } + + inline int _write_key(int fd) + { + return fd * 2 + 1; + } + + template + void _async_wait_fd(int fd, F f, int key) + { + // add descriptor + if (_descriptor_map.find(key) == _descriptor_map.end()) + { + _descriptor_map.emplace(key, + std::move(std::make_unique(_strand.context(), fd)) + ); + } + + _descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, + boost::asio::bind_executor(_strand, [this, key, f] (const boost::system::error_code& ec) + { + _descriptor_map.erase(key); + f(); + })); + return; + } + + static void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr); + static void _sock_accept(event_loop& loop, object fut, object sock); +}; + +void set_default_event_loop(const boost::asio::io_context::strand& strand); + +}}} // namespace boost::python::asio + +# endif diff --git a/src/eventloop.cpp b/src/eventloop.cpp new file mode 100644 index 0000000000..47bface908 --- /dev/null +++ b/src/eventloop.cpp @@ -0,0 +1,475 @@ +// Copyright Pan Yue 2021. +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// TODO: +// 1. posix::stream_descriptor need windows version +// 2. call_* need return async.Handle +// 3. _ensure_fd_no_transport +// 4. _ensure_resolve + +#include +#include +#include +#include +#include +#include +#include + + +namespace boost { namespace python { namespace asio { +namespace +{ + +bool _hasattr(object o, const char* name) +{ + return PyObject_HasAttrString(o.ptr(), name); +} + +void raise_dup_error() +{ + PyErr_SetString(PyExc_OSError, std::system_category().message(errno).c_str()); + throw_error_already_set(); +} + +} // namespace + +void event_loop::_sock_connect_cb(object pymod_socket, object fut, object sock, object addr) +{ + try + { + object err = sock.attr("getsockopt")( + pymod_socket.attr("SOL_SOCKET"), pymod_socket.attr("SO_ERROR")); + if (err != object(0)) { + // TODO: print the address + PyErr_SetString(PyExc_OSError, "Connect call failed {address}"); + throw_error_already_set(); + } + fut.attr("set_result")(object()); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_BlockingIOError) + || PyErr_ExceptionMatches(PyExc_InterruptedError)) + { + PyErr_Clear(); + // pass + } + else if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else + { + PyErr_Clear(); + fut.attr("set_exception")(std::current_exception()); + } + } +} + +void event_loop::_sock_accept(event_loop& loop, object fut, object sock) +{ + int fd = extract(sock.attr("fileno")()); + object conn, address; + try + { + object ret = sock.attr("accept")(); + conn = ret[0]; + address = ret[1]; + conn.attr("setblocking")(object(false)); + fut.attr("set_result")(make_tuple(conn, address)); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_BlockingIOError) + || PyErr_ExceptionMatches(PyExc_InterruptedError)) + { + PyErr_Clear(); + loop._async_wait_fd(fd, bind(_sock_accept, boost::ref(loop), fut, sock), loop._write_key(fd)); + } + else if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else + { + PyErr_Clear(); + fut.attr("set_exception")(std::current_exception()); + } + } +} + +void event_loop::call_later(double delay, object f) +{ + auto p_timer = std::make_shared( + _strand.context(), + std::chrono::duration_cast(std::chrono::duration(delay))); + p_timer->async_wait(boost::asio::bind_executor(_strand, + [f, p_timer] (const boost::system::error_code& ec) { + PyEval_AcquireLock(); + f(); + PyEval_ReleaseLock(); + })); +} + +void event_loop::call_at(double when, object f) +{ + using sc = std::chrono::steady_clock; + auto p_timer = std::make_shared( + _strand.context(), + sc::duration(static_cast(when))); + p_timer->async_wait(boost::asio::bind_executor(_strand, + [f, p_timer] (const boost::system::error_code& ec) {f();})); +} + +// TODO: support windows socket +object event_loop::sock_recv(object sock, size_t nbytes) +{ + int fd = extract(sock.attr("fileno")()); + int fd_dup = dup(fd); + if (fd_dup == -1) + raise_dup_error(); + object py_fut = create_future(); + _async_wait_fd(fd_dup, + [py_fut, nbytes, fd=fd_dup] { + PyEval_AcquireLock(); + std::vector buffer(nbytes); + read(fd, buffer.data(), nbytes); + py_fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes)))); + PyEval_ReleaseLock(); + }, + _read_key(fd)); + return py_fut; +} + +// TODO: support windows socket +object event_loop::sock_recv_into(object sock, object buffer) +{ + int fd = extract(sock.attr("fileno")()); + int fd_dup = dup(fd); + if (fd_dup == -1) + raise_dup_error(); + ssize_t nbytes = len(buffer); + object py_fut = create_future(); + _async_wait_fd(fd_dup, + [py_fut, nbytes, fd=fd_dup] { + PyEval_AcquireLock(); + std::vector buffer(nbytes); + ssize_t nbytes_read = read(fd, buffer.data(), nbytes); + py_fut.attr("set_result")(nbytes_read); + PyEval_ReleaseLock(); + }, + _read_key(fd)); + return py_fut; +} + +// TODO: support windows socket +object event_loop::sock_sendall(object sock, object data) +{ + int fd = extract(sock.attr("fileno")()); + int fd_dup = dup(fd); + if (fd_dup == -1) + raise_dup_error(); + char const* py_str = extract(data.attr("decode")()); + ssize_t py_str_len = len(data); + object py_fut = create_future(); + _async_wait_fd(fd_dup, + [py_fut, fd, py_str, py_str_len] { + PyEval_AcquireLock(); + write(fd, py_str, py_str_len); + py_fut.attr("set_result")(object()); + PyEval_ReleaseLock(); + }, + _write_key(fd)); + return py_fut; +} + +// TODO: support windows socket +object event_loop::sock_connect(object sock, object address) +{ + + if (!_hasattr(_pymod_socket, "AF_UNIX") || sock.attr("family") != _pymod_socket.attr("AF_UNIX")) + { + // TODO: _ensure_resolve + } + object py_fut = create_future(); + int fd = extract(sock.attr("fileno")()); + try + { + sock.attr("connect")(address); + py_fut.attr("set_result")(object()); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_BlockingIOError) + || PyErr_ExceptionMatches(PyExc_InterruptedError)) + { + PyErr_Clear(); + int fd_dup = dup(fd); + if (fd_dup == -1) + raise_dup_error(); + _async_wait_fd(fd_dup, bind(_sock_connect_cb, _pymod_socket, py_fut, sock, address), _write_key(fd)); + } + else if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else + { + PyErr_Clear(); + py_fut.attr("set_exception")(std::current_exception()); + } + } + return py_fut; +} + +object event_loop::sock_accept(object sock) +{ + object py_fut = create_future(); + _sock_accept(*this, py_fut, sock); + return py_fut; +} + +// TODO: implement this +object event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback) +{ + PyErr_SetString(PyExc_NotImplementedError, "Not implemented!"); + throw_error_already_set(); + return object(); +} + +// TODO: implement this +object event_loop::start_tls(object transport, object protocol, object sslcontext, + bool server_side, object server_hostname, object ssl_handshake_timeout) +{ + PyErr_SetString(PyExc_NotImplementedError, "Not implemented!"); + throw_error_already_set(); + return object(); +} + +object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags) +{ + object py_fut = create_future(); + _strand.post( + [this, py_fut, host, port, family, type, proto, flags] { + PyEval_AcquireLock(); + object res = _pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); + py_fut.attr("set_result")(res); + PyEval_ReleaseLock(); + }); + return py_fut; +} + +object event_loop::getnameinfo(object sockaddr, int flags) +{ + object py_fut = create_future(); + _strand.post( + [this, py_fut, sockaddr, flags] { + PyEval_AcquireLock(); + object res = _pymod_socket.attr("getnameinfo")(sockaddr, flags); + py_fut.attr("set_result")(res); + PyEval_ReleaseLock(); + }); + return py_fut; +} + +void event_loop::default_exception_handler(object context) +{ + object message = context.attr("get")(str("message")); + if (message == object()) + { + message = str("Unhandled exception in event loop"); + } + + object exception = context.attr("get")(str("exception")); + object exc_info; + if (exception != object()) + { + exc_info = make_tuple(exception.attr("__class__"), exception, exception.attr("__traceback__")); + } + else + { + exc_info = object(false); + } + if (!PyObject_IsTrue(context.attr("__contains__")(str("source_traceback")).ptr()) && + _exception_handler != object() && + _exception_handler.attr("_source_traceback") != object()) + { + context["handle_traceback"] = _exception_handler.attr("_source_traceback"); + } + + list log_lines; + log_lines.append(message); + list context_keys(context.attr("keys")); + context_keys.sort(); + for (int i = 0; i < len(context_keys); i++) + { + std::string key = extract(context_keys[i]); + if (key == "message" || key == "exception") + continue; + str value(context[key]); + if (key == "source_traceback") + { + str tb = str("").join(_pymod_traceback.attr("format_list")(value)); + value = str("Object created at (most recent call last):\n"); + value += tb.rstrip(); + } + else if (key == "handle_traceback") + { + str tb = str("").join(_pymod_traceback.attr("format_list")(value)); + value = str("Handle created at (most recent call last):\n"); + value += tb.rstrip(); + } + else + { + value = str(value.attr("__str__")()); + } + std::ostringstream stringStream; + stringStream << key << ": " << value; + log_lines.append(str(stringStream.str())); + } + list args; + dict kwargs; + args.append(str("\n").join(log_lines)); + kwargs["exc_info"] = exc_info; + _py_logger.attr("error")(tuple(args), **kwargs); +} + +void event_loop::call_exception_handler(object context) +{ + if (_exception_handler == object()) + { + try + { + default_exception_handler(context); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else + { + PyErr_Clear(); + list args; + dict kwargs; + args.append(str("Exception in default exception handler")); + kwargs["exc_info"] = true; + _py_logger.attr("error")(tuple(args), **kwargs); + } + } + } + else + { + try + { + _exception_handler(context); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else + { + PyObject *ptype, *pvalue, *ptraceback; + PyErr_Fetch(&ptype, &pvalue, &ptraceback); + PyErr_NormalizeException(&ptype, &pvalue, &ptraceback); + object type{handle<>(ptype)}; + object value{handle<>(pvalue)}; + object traceback{handle<>(ptraceback)}; + try + { + dict tmp_dict; + tmp_dict["message"] = str("Unhandled error in exception handler"); + tmp_dict["exception"] = value; + tmp_dict["context"] = context; + default_exception_handler(tmp_dict); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else + { + boost::python::list args; + boost::python::dict kwargs; + args.append(str("Exception in default exception handler")); + kwargs["exc_info"] = true; + _py_logger.attr("error")(tuple(args), **kwargs); + } + } + } + } + } +} + +object event_loop::create_future() +{ + boost::python::dict kwargs; + kwargs["loop"] = boost::ref(*this); + return _pymod_asyncio_futures.attr("Future")(*boost::python::tuple(), **kwargs); +} + +void set_default_event_loop(const boost::asio::io_context::strand& strand) +{ + class_("BoostAsioEventLoop", init()) + .def("call_soon", &event_loop::call_soon) + .def("call_soon_thread_safe", &event_loop::call_soon_thread_safe) + .def("call_later", &event_loop::call_later) + .def("call_at", &event_loop::call_at) + .def("time", &event_loop::time) + .def("add_reader", &event_loop::add_reader) + .def("remove_reader", &event_loop::remove_reader) + .def("add_writer", &event_loop::add_writer) + .def("remove_writer", &event_loop::remove_writer) + .def("sock_recv", &event_loop::sock_recv) + .def("sock_recv_into", &event_loop::sock_recv_into) + .def("sock_sendall", &event_loop::sock_sendall) + .def("sock_connect", &event_loop::sock_connect) + .def("sock_accept", &event_loop::sock_accept) + .def("sock_sendfile", &event_loop::sock_sendfile) + .def("start_tls", &event_loop::start_tls) + .def("getaddrinfo", &event_loop::getaddrinfo) + .def("getnameinfo", &event_loop::getnameinfo) + .def("set_exception_handler", &event_loop::set_exception_handler) + .def("get_exception_handler", &event_loop::get_exception_handler) + .def("default_exception_handler", &event_loop::default_exception_handler) + .def("call_exception_handler", &event_loop::call_exception_handler) + .def("create_future", &event_loop::create_future) + .def("get_debug", &event_loop::get_debug); + + object asyncio = import("asyncio"); + object abstract_policy = asyncio.attr("AbstractEventLoopPolicy"); + + dict method_dict; + std::shared_ptr p_loop = std::make_shared(strand); + + method_dict["get_event_loop"] = make_function( + [p_loop] (object e) {return object(boost::ref(*p_loop));}, + default_call_policies(), + boost::mpl::vector() + ); + + object class_boost_policy = call( + (PyObject*)&PyType_Type, + str("BoostEventLoopPolicy"), + boost::python::make_tuple(abstract_policy), + method_dict); + + object boost_policy_instance = class_boost_policy.attr("__call__")(); + asyncio.attr("set_event_loop_policy")(boost_policy_instance); +} + +}}} // namespace boost::python::asio \ No newline at end of file diff --git a/src/fabscript b/src/fabscript index 0ebeac6098..6c5b858351 100644 --- a/src/fabscript +++ b/src/fabscript @@ -40,7 +40,8 @@ bpl = library('boost_python' + root.py_suffix, 'wrapper.cpp', 'import.cpp', 'exec.cpp', - 'object/function_doc_signature.cpp'], + 'object/function_doc_signature.cpp', + 'eventloop.cpp'], dependencies=root.config, features=features + define('BOOST_PYTHON_SOURCE')) diff --git a/test/Jamfile b/test/Jamfile index 9a5c756956..1ca4d38f84 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -84,6 +84,7 @@ bpl-test crossmod_exception : crossmod_exception.py crossmod_exception_a.cpp crossmod_exception_b.cpp ] +[ bpl-test eventloop ] [ bpl-test injected ] [ bpl-test properties ] [ bpl-test return_arg ]