Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: GSoC21 - python eventloop using Boost.Asio strand #365

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ lib boost_python
import.cpp
exec.cpp
object/function_doc_signature.cpp
eventloop.cpp
: # requirements
<link>static:<define>BOOST_PYTHON_STATIC_LIB
<define>BOOST_PYTHON_SOURCE
Expand Down
95 changes: 95 additions & 0 deletions doc/reference/eventloop.qbk
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
[chapter Event Loop
[quickbook 1.7]
]

[section boost/python/eventloop.hpp]

[section Introduction]
Provide a Boost.Asio-based implementation for the Python [@https://docs.python.org/3/library/asyncio-eventloop.html `EventLoop`] type. Every callback is scheduled in strand.
[endsect]

[section Function `set_default_event_loop`]
``
void set_default_event_loop(const boost::asio::io_context::strand& strand);
``
[variablelist
[[Effect][construct an `event_loop` object using provided [@https://www.boost.org/doc/libs/1_76_0/doc/html/boost_asio/overview/core/strands.html `strand`] object. Setup a new [@https://docs.python.org/3/library/asyncio-policy.html event loop policy], when user call `get_event_loop` using that policy, it returns the Boost Asio `event_loop` object]]
[[Throws][nothing]]
]
[endsect]

[section Function `PyInit_boost_event_loop`]
``
extern "C"
{
PyObject* PyInit_boost_event_loop();
}
``
[variablelist
[[Effect][user must call `PyImport_AppendInittab("boost_event_loop", &PyInit_boost_event_loop);` before [@https://docs.python.org/3/c-api/init.html#c.Py_Initialize `Py_Initialize`]]]
[[Throws][nothing]]
]
[endsect]

[section Example]
``
// example.cpp
io_context ctx;
io_context::strand st{ctx};

PyImport_AppendInittab("boost_event_loop", &PyInit_boost_event_loop);
Py_Initialize();
set_default_event_loop(st);

object py_module = import("example.py");
py_module.attr("hello_world")();
st.context().run();

// example.py
import asyncio
def hello_world():
print("hello world")

def call_event_loop():
loop = asyncio.get_event_loop_policy().get_event_loop()
loop.call_soon(hello_world)
``
Note: `set_default_event_loop` must be called before any Python module is imported. Otherwise it may result in the module-level variables registered against the default asyncio eventloop instead the boost asio eventloop. Here is an example demonstrating the issue.
``
// bad_import.cpp
Py_Initialize();
import("example_module"); // example_module is initialized
set_default_event_loop(st); // boost_asio_eventloop is set to default, but the example_module.lock was registered against the old eventloop

// example_module.py
import asyncio
lock = asyncio.Lock()
``
[endsect]

[section Event Loop and Multiple Python Sub-interpreters]
It's allowed to have multiple Python sub-interpreter instances in a same program. Each interpreter will act as a guest VM, and C++ host will schedule all the asynchronous events committed by the Python VM.[br]
The Python interpreter must outlive the [@https://www.boost.org/doc/libs/1_76_0/doc/html/boost_asio/reference/io_service.html `asio::io_context`] objects it owns. It's not safe to destroy the interpreter midways.[br]
[endsect]

[section [@https://docs.python.org/3/c-api/init.html#thread-state-and-the-global-interpreter-lock `GIL`]]
`boost::asio::io_context::run()` may be called from multiple threads and the completion handlers may wakeup on different threads as well. The GIL must be released after setting up the Python IO object and before the call to `boost::asio::io_context::run()`. In the completion handler, the GIL must be reacquired and release before it calls into Python to deliver the result.
``
// gil.cpp
py_func(); // call func imported from gil.py
PyEval_ReleaseLock(); // release GIL
ctx.run();

// gil.py
import asyncio

def hello():
print("hello_world")

def func():
loop = asyncio.get_event_loop_policy().get_event_loop()
loop.call_soon(hello)
``
[endsect]

[endsect]
179 changes: 179 additions & 0 deletions include/boost/python/eventloop.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright Pan Yue 2021.
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

// TODO:
// 1. posix::stream_descriptor need windows version
// 2. call_* need return async.Handle
# ifndef EVENT_LOOP_PY2021_HPP
# define EVENT_LOOP_PY2021_HPP

#include <unordered_map>
#include <boost/asio.hpp>
#include <boost/python.hpp>
#include <boost/mpl/vector.hpp>

namespace boost { namespace python { namespace asio {

class event_loop
{
public:
event_loop(const boost::asio::io_context::strand& strand): _strand{strand}
{
try
{
_pymod_ssl = import("ssl");
}
catch (const error_already_set& e)
{
if (PyErr_ExceptionMatches(PyExc_ImportError))
{
PyErr_Clear();
}
}
}

// TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback.
inline void call_soon(object f)
{
_strand.post([f]{
PyEval_AcquireLock();
f();
PyEval_ReleaseLock();
});
}

// TODO: implement this
inline void call_soon_thread_safe(object f) {};

// Schedule callback to be called after the given delay number of seconds
// TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback.
void call_later(double delay, object f);

void call_at(double when, object f);

inline double time()
{
return std::chrono::steady_clock::now().time_since_epoch().count();
}

inline void add_reader(int fd, object f)
{
_async_wait_fd(fd, f, _read_key(fd));
}

inline void remove_reader(int fd)
{
_descriptor_map.erase(_read_key(fd));
}

inline void add_writer(int fd, object f)
{
_async_wait_fd(fd, f, _write_key(fd));
}

inline void remove_writer(int fd)
{
_descriptor_map.erase(_write_key(fd));
}


object sock_recv(object sock, size_t nbytes);

object sock_recv_into(object sock, object buffer);

object sock_sendall(object sock, object data);

object sock_connect(object sock, object address);

object sock_accept(object sock);

object sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true);

object start_tls(object transport, object protocol, object sslcontext,
bool server_side = false,
object server_hostname = object(),
object ssl_handshake_timeout = object());

object getaddrinfo(object host, int port, int family = 0, int type = 0, int proto = 0, int flags = 0);

object getnameinfo(object sockaddr, int flags = 0);

void set_exception_handler(object handler)
{
if (handler != object() && !PyObject_HasAttrString(handler.ptr(), "__call__")) {
PyErr_SetString(PyExc_TypeError, "A callable object or None is expected");
throw_error_already_set();
}
_exception_handler = handler;
}

object get_exception_handler()
{
return _exception_handler;
}

void default_exception_handler(object context);

void call_exception_handler(object context);

object create_future();

// TODO
inline bool get_debug()
{
return false;
}

private:
object _pymod_ssl = object();
object _pymod_socket = import("socket");
object _pymod_traceback = import("traceback");
object _pymod_asyncio_futures = import("asyncio").attr("futures");
object _py_logger = import("asyncio.log").attr("logger");
object _pymod_concurrent_futures = import("concurrent").attr("futures");
object _exception_handler = object();
boost::asio::io_context::strand _strand;
// read: key = fd * 2 + 0, write: key = fd * 2 + 1
std::unordered_map<int, std::unique_ptr<boost::asio::posix::stream_descriptor>> _descriptor_map;

inline int _read_key(int fd)
{
return fd * 2;
}

inline int _write_key(int fd)
{
return fd * 2 + 1;
}

template<typename F>
void _async_wait_fd(int fd, F f, int key)
{
// add descriptor
if (_descriptor_map.find(key) == _descriptor_map.end())
{
_descriptor_map.emplace(key,
std::move(std::make_unique<boost::asio::posix::stream_descriptor>(_strand.context(), fd))
);
}

_descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read,
boost::asio::bind_executor(_strand, [this, key, f] (const boost::system::error_code& ec)
{
_descriptor_map.erase(key);
f();
}));
return;
}

static void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr);
static void _sock_accept(event_loop& loop, object fut, object sock);
};

void set_default_event_loop(const boost::asio::io_context::strand& strand);

}}} // namespace boost::python::asio

# endif
Loading