From e6b292feecfcbb5bde0f6ba1167fb865de116697 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Tue, 8 Jun 2021 14:56:48 +0000 Subject: [PATCH 01/15] add eventloop implementations for call_* and call_when_*_completed functions --- build/Jamfile | 1 + include/boost/python.hpp | 1 + include/boost/python/eventloop.hpp | 111 +++++++++++++++++++++++++++++ src/eventloop.cpp | 81 +++++++++++++++++++++ src/fabscript | 3 +- test/Jamfile | 1 + 6 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 include/boost/python/eventloop.hpp create mode 100644 src/eventloop.cpp diff --git a/build/Jamfile b/build/Jamfile index dbc9fb2036..bfb6c5539b 100644 --- a/build/Jamfile +++ b/build/Jamfile @@ -68,6 +68,7 @@ lib boost_python import.cpp exec.cpp object/function_doc_signature.cpp + eventloop.cpp : # requirements static:BOOST_PYTHON_STATIC_LIB BOOST_PYTHON_SOURCE diff --git a/include/boost/python.hpp b/include/boost/python.hpp index e484034103..a181afaeae 100644 --- a/include/boost/python.hpp +++ b/include/boost/python.hpp @@ -25,6 +25,7 @@ # include # include # include +# include # include # include # include diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp new file mode 100644 index 0000000000..78fb48c8a9 --- /dev/null +++ b/include/boost/python/eventloop.hpp @@ -0,0 +1,111 @@ +// Copyright Pan Yue 2021. +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// TODO: +// 1. posix::stream_descriptor need windows version +// 2. call_* need return async.Handle +# ifndef EVENT_LOOP_PY2021_H_ +# define EVENT_LOOP_PY2021_H_ + +#include +#include +#include + +namespace a = boost::asio; +namespace c = std::chrono; +namespace py = boost::python; + +namespace boost { namespace python { namespace eventloop { + +class EventLoop +{ +private: + int64_t _timer_id = 0; + a::io_context::strand _strand; + std::unordered_map> _id_to_timer_map; + // read: key = fd * 2 + 0, write: key = fd * 2 + 1 + std::unordered_map> _descriptor_map; + std::chrono::steady_clock::time_point _created_time; + + void _add_reader_or_writer(int fd, py::object f, int key); + void _remove_reader_or_writer(int key); + +public: + EventLoop(a::io_context& ctx): + _strand{ctx}, _created_time{std::chrono::steady_clock::now()} + { + } + + // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. + inline void call_soon(py::object f) + { + _strand.post([f, loop=this] { + f(boost::ref(*loop)); + }); + return; + } + + // TODO: implement this + inline void call_soon_thread_safe(py::object f) {}; + + // Schedule callback to be called after the given delay number of seconds + // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. + void call_later(double delay, py::object f); + + void call_at(double when, py::object f); + + inline double time() + { + return static_cast>(std::chrono::steady_clock::now() - _created_time).count(); + } + + // week 2 ......start...... + + inline void add_reader(int fd, py::object f) + { + _add_reader_or_writer(fd, f, fd * 2); + } + + inline void remove_reader(int fd) + { + _remove_reader_or_writer(fd * 2); + } + + inline void add_writer(int fd, py::object f) + { + _add_reader_or_writer(fd, f, fd * 2 + 1); + } + + inline void remove_writer(int fd) + { + _remove_reader_or_writer(fd * 2 + 1); + } + + + void sock_recv(py::object sock, int bytes); + + void sock_recv_into(py::object sock, py::object buffer); + + void sock_sendall(py::object sock, py::object data); + + void sock_connect(py::object sock, py::object address); + + void sock_accept(py::object sock); + + void sock_sendfile(py::object sock, py::object file, int offset = 0, int count = 0, bool fallback = true); + + // week 2 ......end...... + + void run() + { + _strand.context().run(); + } +}; + + +}}} + + +# endif diff --git a/src/eventloop.cpp b/src/eventloop.cpp new file mode 100644 index 0000000000..8977a33958 --- /dev/null +++ b/src/eventloop.cpp @@ -0,0 +1,81 @@ +// Copyright Pan Yue 2021. +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// TODO: +// 1. posix::stream_descriptor need windows version +// 2. call_* need return async.Handle + +#include +#include +#include + +namespace a = boost::asio; +namespace c = std::chrono; +namespace py = boost::python; + +namespace boost { namespace python { namespace eventloop { + +void EventLoop::_add_reader_or_writer(int fd, py::object f, int key) +{ + // add descriptor + if (_descriptor_map.find(key) == _descriptor_map.end()) + { + _descriptor_map.emplace(key, + std::move(std::make_unique(_strand.context(), fd)) + ); + } + + _descriptor_map.find(key)->second->async_wait(a::posix::descriptor::wait_type::wait_read, + a::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec) + { + // move descriptor + auto iter = loop->_descriptor_map.find(key); + if (iter != loop->_descriptor_map.end()) + { + iter->second->release(); + loop->_descriptor_map.erase(iter); + } + loop->call_soon(f); + })); + return; +} + +void EventLoop::_remove_reader_or_writer(int key) +{ + auto iter = _descriptor_map.find(key); + if (iter != _descriptor_map.end()) + { + iter->second->release(); + _descriptor_map.erase(iter); + } +} + +void EventLoop::call_later(double delay, py::object f) +{ + // add timer + _id_to_timer_map.emplace(_timer_id, + std::move(std::make_unique(_strand.context(), + std::chrono::steady_clock::now() + std::chrono::nanoseconds(int64_t(delay * 1e9)))) + ); + + _id_to_timer_map.find(_timer_id)->second->async_wait( + // remove timer + a::bind_executor(_strand, [id=_timer_id, f, loop=this] (const boost::system::error_code& ec) + { + loop->_id_to_timer_map.erase(id); + loop->call_soon(f); + })); + _timer_id++; +} + +void EventLoop::call_at(double when, py::object f) +{ + double diff = when - time(); + if (diff > 0) + return call_later(diff, f); + return call_soon(f); +} + +}}} \ No newline at end of file diff --git a/src/fabscript b/src/fabscript index 0ebeac6098..6c5b858351 100644 --- a/src/fabscript +++ b/src/fabscript @@ -40,7 +40,8 @@ bpl = library('boost_python' + root.py_suffix, 'wrapper.cpp', 'import.cpp', 'exec.cpp', - 'object/function_doc_signature.cpp'], + 'object/function_doc_signature.cpp', + 'eventloop.cpp'], dependencies=root.config, features=features + define('BOOST_PYTHON_SOURCE')) diff --git a/test/Jamfile b/test/Jamfile index 9a5c756956..1ca4d38f84 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -84,6 +84,7 @@ bpl-test crossmod_exception : crossmod_exception.py crossmod_exception_a.cpp crossmod_exception_b.cpp ] +[ bpl-test eventloop ] [ bpl-test injected ] [ bpl-test properties ] [ bpl-test return_arg ] From 7d8fae83481cb85d279cdf9f1b955a17b8ff7ed6 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sat, 26 Jun 2021 17:06:01 +0000 Subject: [PATCH 02/15] add implementations of sock_* functions --- src/eventloop.cpp | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 8977a33958..cf2c8ab8b6 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -78,4 +78,34 @@ void EventLoop::call_at(double when, py::object f) return call_soon(f); } +void EventLoop::sock_recv(py::object sock, int bytes) +{ + +} + +void EventLoop::sock_recv_into(py::object sock, py::object buffer) +{ + +} + +void EventLoop::sock_sendall(py::object sock, py::object data) +{ + +} + +void EventLoop::sock_connect(py::object sock, py::object address) +{ + +} + +void EventLoop::sock_accept(py::object sock) +{ + +} + +void EventLoop::sock_sendfile(py::object sock, py::object file, int offset, int count, bool fallback) +{ + +} + }}} \ No newline at end of file From 5747a3af2eb1f43f565bf4111c254fdc8f1f968f Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 27 Jun 2021 18:08:31 +0000 Subject: [PATCH 03/15] Fixed namespace related issues - fixed namespace pollution - rename boost::python::eventloop to boost::python::asio - remove eventloop.hpp from include list in python.hpp - rename define guards in eventloop.cpp - reorder class members in order: public, protected, private - rename class EventLoop to event_loop - remove `run()` from eventloop --- include/boost/python.hpp | 1 - include/boost/python/eventloop.hpp | 66 ++++++++++++------------------ src/eventloop.cpp | 36 ++++++++-------- 3 files changed, 43 insertions(+), 60 deletions(-) diff --git a/include/boost/python.hpp b/include/boost/python.hpp index a181afaeae..e484034103 100644 --- a/include/boost/python.hpp +++ b/include/boost/python.hpp @@ -25,7 +25,6 @@ # include # include # include -# include # include # include # include diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 78fb48c8a9..4a7c3b30a7 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -6,40 +6,25 @@ // TODO: // 1. posix::stream_descriptor need windows version // 2. call_* need return async.Handle -# ifndef EVENT_LOOP_PY2021_H_ -# define EVENT_LOOP_PY2021_H_ +# ifndef EVENT_LOOP_PY2021_HPP +# define EVENT_LOOP_PY2021_HPP #include #include #include -namespace a = boost::asio; -namespace c = std::chrono; -namespace py = boost::python; +namespace boost { namespace python { namespace asio { -namespace boost { namespace python { namespace eventloop { - -class EventLoop +class event_loop { -private: - int64_t _timer_id = 0; - a::io_context::strand _strand; - std::unordered_map> _id_to_timer_map; - // read: key = fd * 2 + 0, write: key = fd * 2 + 1 - std::unordered_map> _descriptor_map; - std::chrono::steady_clock::time_point _created_time; - - void _add_reader_or_writer(int fd, py::object f, int key); - void _remove_reader_or_writer(int key); - public: - EventLoop(a::io_context& ctx): + event_loop(boost::asio::io_context& ctx): _strand{ctx}, _created_time{std::chrono::steady_clock::now()} { } // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. - inline void call_soon(py::object f) + inline void call_soon(object f) { _strand.post([f, loop=this] { f(boost::ref(*loop)); @@ -48,22 +33,20 @@ class EventLoop } // TODO: implement this - inline void call_soon_thread_safe(py::object f) {}; + inline void call_soon_thread_safe(object f) {}; // Schedule callback to be called after the given delay number of seconds // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. - void call_later(double delay, py::object f); + void call_later(double delay, object f); - void call_at(double when, py::object f); + void call_at(double when, object f); inline double time() { return static_cast>(std::chrono::steady_clock::now() - _created_time).count(); } - // week 2 ......start...... - - inline void add_reader(int fd, py::object f) + inline void add_reader(int fd, object f) { _add_reader_or_writer(fd, f, fd * 2); } @@ -73,7 +56,7 @@ class EventLoop _remove_reader_or_writer(fd * 2); } - inline void add_writer(int fd, py::object f) + inline void add_writer(int fd, object f) { _add_reader_or_writer(fd, f, fd * 2 + 1); } @@ -84,27 +67,30 @@ class EventLoop } - void sock_recv(py::object sock, int bytes); + void sock_recv(object sock, int bytes); - void sock_recv_into(py::object sock, py::object buffer); + void sock_recv_into(object sock, object buffer); - void sock_sendall(py::object sock, py::object data); + void sock_sendall(object sock, object data); - void sock_connect(py::object sock, py::object address); + void sock_connect(object sock, object address); - void sock_accept(py::object sock); + void sock_accept(object sock); - void sock_sendfile(py::object sock, py::object file, int offset = 0, int count = 0, bool fallback = true); + void sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true); - // week 2 ......end...... +private: + int64_t _timer_id = 0; + boost::asio::io_context::strand _strand; + std::unordered_map> _id_to_timer_map; + // read: key = fd * 2 + 0, write: key = fd * 2 + 1 + std::unordered_map> _descriptor_map; + std::chrono::steady_clock::time_point _created_time; - void run() - { - _strand.context().run(); - } + void _add_reader_or_writer(int fd, object f, int key); + void _remove_reader_or_writer(int key); }; - }}} diff --git a/src/eventloop.cpp b/src/eventloop.cpp index cf2c8ab8b6..c4ed980761 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -10,25 +10,23 @@ #include #include #include +#include -namespace a = boost::asio; -namespace c = std::chrono; -namespace py = boost::python; -namespace boost { namespace python { namespace eventloop { +namespace boost { namespace python { namespace asio { -void EventLoop::_add_reader_or_writer(int fd, py::object f, int key) +void event_loop::_add_reader_or_writer(int fd, object f, int key) { // add descriptor if (_descriptor_map.find(key) == _descriptor_map.end()) { _descriptor_map.emplace(key, - std::move(std::make_unique(_strand.context(), fd)) + std::move(std::make_unique(_strand.context(), fd)) ); } - _descriptor_map.find(key)->second->async_wait(a::posix::descriptor::wait_type::wait_read, - a::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec) + _descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, + boost::asio::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec) { // move descriptor auto iter = loop->_descriptor_map.find(key); @@ -42,7 +40,7 @@ void EventLoop::_add_reader_or_writer(int fd, py::object f, int key) return; } -void EventLoop::_remove_reader_or_writer(int key) +void event_loop::_remove_reader_or_writer(int key) { auto iter = _descriptor_map.find(key); if (iter != _descriptor_map.end()) @@ -52,17 +50,17 @@ void EventLoop::_remove_reader_or_writer(int key) } } -void EventLoop::call_later(double delay, py::object f) +void event_loop::call_later(double delay, object f) { // add timer _id_to_timer_map.emplace(_timer_id, - std::move(std::make_unique(_strand.context(), + std::move(std::make_unique(_strand.context(), std::chrono::steady_clock::now() + std::chrono::nanoseconds(int64_t(delay * 1e9)))) ); _id_to_timer_map.find(_timer_id)->second->async_wait( // remove timer - a::bind_executor(_strand, [id=_timer_id, f, loop=this] (const boost::system::error_code& ec) + boost::asio::bind_executor(_strand, [id=_timer_id, f, loop=this] (const boost::system::error_code& ec) { loop->_id_to_timer_map.erase(id); loop->call_soon(f); @@ -70,7 +68,7 @@ void EventLoop::call_later(double delay, py::object f) _timer_id++; } -void EventLoop::call_at(double when, py::object f) +void event_loop::call_at(double when, object f) { double diff = when - time(); if (diff > 0) @@ -78,32 +76,32 @@ void EventLoop::call_at(double when, py::object f) return call_soon(f); } -void EventLoop::sock_recv(py::object sock, int bytes) +void event_loop::sock_recv(object sock, int bytes) { } -void EventLoop::sock_recv_into(py::object sock, py::object buffer) +void event_loop::sock_recv_into(object sock, object buffer) { } -void EventLoop::sock_sendall(py::object sock, py::object data) +void event_loop::sock_sendall(object sock, object data) { } -void EventLoop::sock_connect(py::object sock, py::object address) +void event_loop::sock_connect(object sock, object address) { } -void EventLoop::sock_accept(py::object sock) +void event_loop::sock_accept(object sock) { } -void EventLoop::sock_sendfile(py::object sock, py::object file, int offset, int count, bool fallback) +void event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback) { } From a2a2bbeaa59d88a5102fb834d0980df96940aab0 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 27 Jun 2021 18:20:00 +0000 Subject: [PATCH 04/15] add implementations for eventloop functions sock_* --- include/boost/python/eventloop.hpp | 9 +- src/eventloop.cpp | 195 +++++++++++++++++++++++++++-- 2 files changed, 190 insertions(+), 14 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 4a7c3b30a7..c8338d73d4 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -67,20 +67,21 @@ class event_loop } - void sock_recv(object sock, int bytes); + object sock_recv(object sock, size_t nbytes); - void sock_recv_into(object sock, object buffer); + size_t sock_recv_into(object sock, object buffer); - void sock_sendall(object sock, object data); + object sock_sendall(object sock, object data); void sock_connect(object sock, object address); - void sock_accept(object sock); + object sock_accept(object sock); void sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true); private: int64_t _timer_id = 0; + object _pymod_socket = import("socket"); boost::asio::io_context::strand _strand; std::unordered_map> _id_to_timer_map; // read: key = fd * 2 + 0, write: key = fd * 2 + 1 diff --git a/src/eventloop.cpp b/src/eventloop.cpp index c4ed980761..676a66126f 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -6,14 +6,121 @@ // TODO: // 1. posix::stream_descriptor need windows version // 2. call_* need return async.Handle +// 3. _ensure_fd_no_transport +// 4. _ensure_resolve #include #include #include #include +#include +#include namespace boost { namespace python { namespace asio { +namespace +{ + +bool _hasattr(object o, const char* name) +{ + return PyObject_HasAttrString(o.ptr(), name); +} + +void _sock_recv_handler( + std::promise>& prom_data, + std::promise& prom_nbytes_read, + size_t nbytes, + int fd) +{ + std::vector buffer(nbytes); + prom_nbytes_read.set_value(read(fd, buffer.data(), nbytes)); + prom_data.set_value(std::move(buffer)); +} + +void _sock_send_handler(std::promise& prom, int fd, const char *py_str, ssize_t len) +{ + size_t nwrite = write(fd, py_str, len); + prom.set_value(nwrite); +} + +void _sock_connect_cb(object pymod_socket, std::promise& prom, std::future& fut, object sock, object addr) +{ + try + { + object err = sock.attr("getsockopt")( + pymod_socket.attr("SOL_SOCKET"), pymod_socket.attr("SO_ERROR")); + if (err != object(0)) { + // TODO: print the address + PyErr_SetString(PyExc_OSError, "Connect call failed {address}"); + } + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_BlockingIOError) + || PyErr_ExceptionMatches(PyExc_InterruptedError)) + { + PyErr_Clear(); + // pass + } + else if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else if (PyErr_ExceptionMatches(PyExc_BaseException)) + { + PyErr_Clear(); + prom.set_exception(std::current_exception()); + } + else + { + PyErr_Clear(); + prom.set_value(); + } + } +} + +void _sock_accept(event_loop& loop, std::promise& prom, std::future& fut, object sock) +{ + int fd = extract(sock.attr("fileno")()); + object conn; + object address; + try + { + object ret = sock.attr("accept")(); + conn = ret[0]; + address = ret[1]; + conn.attr("setblocking")(object(false)); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_BlockingIOError) + || PyErr_ExceptionMatches(PyExc_InterruptedError)) + { + PyErr_Clear(); + loop.add_reader(fd, make_function(bind( + _sock_accept, boost::ref(loop), boost::ref(prom), boost::ref(fut), sock), + default_call_policies(), boost::mpl::vector())); + } + else if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else if (PyErr_ExceptionMatches(PyExc_BaseException)) + { + PyErr_Clear(); + prom.set_exception(std::current_exception()); + } + else + { + PyErr_Clear(); + prom.set_value(make_tuple(conn, address)); + } + } +} + +} void event_loop::_add_reader_or_writer(int fd, object f, int key) { @@ -76,34 +183,102 @@ void event_loop::call_at(double when, object f) return call_soon(f); } -void event_loop::sock_recv(object sock, int bytes) +object event_loop::sock_recv(object sock, size_t nbytes) { - + int fd = extract(sock.attr("fileno")()); + std::promise> prom_data; + std::future> fut_data = prom_data.get_future(); + std::promise prom_nbytes_read; + std::future fut_nbytes_read = prom_nbytes_read.get_future(); + add_reader(fd, make_function(bind(_sock_recv_handler, + boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd), + default_call_policies(), boost::mpl::vector())); + return object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes))); } -void event_loop::sock_recv_into(object sock, object buffer) +size_t event_loop::sock_recv_into(object sock, object buffer) { - + int fd = extract(sock.attr("fileno")()); + ssize_t nbytes = len(buffer); + std::promise> prom_data; + std::future> fut_data = prom_data.get_future(); + std::promise prom_nbytes_read; + std::future fut_nbytes_read = prom_nbytes_read.get_future(); + add_reader(fd, make_function(bind(_sock_recv_handler, + boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd), + default_call_policies(), boost::mpl::vector())); + buffer = object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes))); + return fut_nbytes_read.get(); } -void event_loop::sock_sendall(object sock, object data) +object event_loop::sock_sendall(object sock, object data) { - + int fd = extract(sock.attr("fileno")()); + char const* py_str = extract(data.attr("decode")()); + ssize_t py_str_len = len(data); + std::promise prom; + std::future fut = prom.get_future(); + add_writer(fd, make_function(bind(_sock_send_handler, std::ref(prom), fd, py_str, py_str_len), + default_call_policies(), boost::mpl::vector())); + fut.wait(); + return object(); } void event_loop::sock_connect(object sock, object address) { - + + if (!_hasattr(_pymod_socket, "AF_UNIX") || sock.attr("family") != _pymod_socket.attr("AF_UNIX")) + { + // TODO: _ensure_resolve + } + std::promise prom; + std::future fut = prom.get_future(); + int fd = extract(sock.attr("fileno")()); + try + { + sock.attr("connect")(address); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_BlockingIOError) + || PyErr_ExceptionMatches(PyExc_InterruptedError)) + { + PyErr_Clear(); + add_writer(fd, make_function(bind( + _sock_connect_cb, _pymod_socket, boost::ref(prom), boost::ref(fut), sock, address), + default_call_policies(), boost::mpl::vector())); + } + else if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else if (PyErr_ExceptionMatches(PyExc_BaseException)) + { + PyErr_Clear(); + prom.set_exception(std::current_exception()); + } + else + { + PyErr_Clear(); + prom.set_value(); + } + } + fut.wait(); } -void event_loop::sock_accept(object sock) +object event_loop::sock_accept(object sock) { - + std::promise prom; + std::future fut = prom.get_future(); + _sock_accept(*this, prom, fut, sock); + return fut.get(); } +// TODO: implement this void event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback) { - + PyErr_SetString(PyExc_NotImplementedError, "Not implemented!"); } }}} \ No newline at end of file From 05108470323946d802d5f72ee649fe5fa87879e0 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Tue, 29 Jun 2021 17:23:54 +0000 Subject: [PATCH 05/15] add TLS Upgrade, DNS, Error Handling API --- include/boost/python/eventloop.hpp | 42 ++++++ src/eventloop.cpp | 209 ++++++++++++++++++++++++++--- 2 files changed, 233 insertions(+), 18 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index c8338d73d4..4debd30b7e 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -21,6 +21,17 @@ class event_loop event_loop(boost::asio::io_context& ctx): _strand{ctx}, _created_time{std::chrono::steady_clock::now()} { + try + { + _pymod_ssl = import("ssl"); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_ImportError)) + { + PyErr_Clear(); + } + } } // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. @@ -79,9 +90,40 @@ class event_loop void sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true); + void start_tls(object transport, object protocol, object sslcontext, + bool server_side = false, + object server_hostname = object(), + object ssl_handshake_timeout = object()); + + object getaddrinfo(object host, int port, int family = 0, int type = 0, int proto = 0, int flags = 0); + + object getnameinfo(object sockaddr, int flags = 0); + + void set_exception_handler(object handler) + { + if (handler != object() && !PyObject_HasAttrString(handler.ptr(), "__call__")) { + PyErr_SetString(PyExc_TypeError, "A callable object or None is expected"); + throw_error_already_set(); + } + _exception_handler = handler; + } + + object get_exception_handler() + { + return _exception_handler; + } + + void default_exception_handler(object context); + + void call_exception_handler(object context); + private: int64_t _timer_id = 0; + object _pymod_ssl = object(); object _pymod_socket = import("socket"); + object _pymod_traceback = import("traceback"); + object _pymod_logger = import("asyncio.log").attr("logger"); + object _exception_handler = object(); boost::asio::io_context::strand _strand; std::unordered_map> _id_to_timer_map; // read: key = fd * 2 + 0, write: key = fd * 2 + 1 diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 676a66126f..9335f731a4 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -9,6 +9,7 @@ // 3. _ensure_fd_no_transport // 4. _ensure_resolve +#include #include #include #include @@ -52,7 +53,9 @@ void _sock_connect_cb(object pymod_socket, std::promise& prom, std::future if (err != object(0)) { // TODO: print the address PyErr_SetString(PyExc_OSError, "Connect call failed {address}"); + throw_error_already_set(); } + prom.set_value(); } catch (const error_already_set& e) { @@ -67,15 +70,10 @@ void _sock_connect_cb(object pymod_socket, std::promise& prom, std::future { // raise } - else if (PyErr_ExceptionMatches(PyExc_BaseException)) - { - PyErr_Clear(); - prom.set_exception(std::current_exception()); - } else { PyErr_Clear(); - prom.set_value(); + prom.set_exception(std::current_exception()); } } } @@ -91,6 +89,7 @@ void _sock_accept(event_loop& loop, std::promise& prom, std::future& prom, std::future& prom, + object host, int port, int family, int type, int proto, int flags) +{ + object res = pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); + prom.set_value(res); +} + +void _getnameinfo_handler(object pymod_socket, std::promise& prom, object sockaddr, int flags) +{ + object res = pymod_socket.attr("getnameinfo")(sockaddr, flags); + prom.set_value(res); +} + } void event_loop::_add_reader_or_writer(int fd, object f, int key) @@ -237,6 +244,7 @@ void event_loop::sock_connect(object sock, object address) try { sock.attr("connect")(address); + prom.set_value(); } catch (const error_already_set& e) { @@ -253,15 +261,10 @@ void event_loop::sock_connect(object sock, object address) { // raise } - else if (PyErr_ExceptionMatches(PyExc_BaseException)) - { - PyErr_Clear(); - prom.set_exception(std::current_exception()); - } else { PyErr_Clear(); - prom.set_value(); + prom.set_exception(std::current_exception()); } } fut.wait(); @@ -279,6 +282,176 @@ object event_loop::sock_accept(object sock) void event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback) { PyErr_SetString(PyExc_NotImplementedError, "Not implemented!"); + throw_error_already_set(); +} + +// TODO: implement this +void event_loop::start_tls(object transport, object protocol, object sslcontext, + bool server_side, object server_hostname, object ssl_handshake_timeout) +{ + PyErr_SetString(PyExc_NotImplementedError, "Not implemented!"); + throw_error_already_set(); +} + +object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags) +{ + std::promise prom; + std::future fut = prom.get_future(); + call_soon(make_function( + bind(_getaddrinfo_handler, _pymod_socket, boost::ref(prom), host, port, family, type, proto, flags), + default_call_policies(), + boost::mpl::vector())); + return fut.get(); +} + +object event_loop::getnameinfo(object sockaddr, int flags) +{ + std::promise prom; + std::future fut = prom.get_future(); + call_soon(make_function( + bind(_getnameinfo_handler, _pymod_socket, boost::ref(prom), sockaddr, flags), + default_call_policies(), + boost::mpl::vector())); + return fut.get(); +} + +void event_loop::default_exception_handler(object context) +{ + object message = context.attr("get")(str("message")); + if (message == object()) + { + message = str("Unhandled exception in event loop"); + } + + object exception = context.attr("get")(str("exception")); + object exc_info; + if (exception != object()) + { + exc_info = make_tuple(exception.attr("__class__"), exception, exception.attr("__traceback__")); + } + else + { + exc_info = object(false); + } + if (!PyObject_IsTrue(context.attr("__contains__")(str("source_traceback")).ptr()) && + _exception_handler != object() && + _exception_handler.attr("_source_traceback") != object()) + { + context["handle_traceback"] = _exception_handler.attr("_source_traceback"); + } + + list log_lines; + log_lines.append(message); + list context_keys(context.attr("keys")); + context_keys.sort(); + for (int i = 0; i < len(context_keys); i++) + { + std::string key = extract(context_keys[i]); + if (key == "message" || key == "exception") + continue; + str value(context[key]); + if (key == "source_traceback") + { + str tb = str("").join(_pymod_traceback.attr("format_list")(value)); + value = str("Object created at (most recent call last):\n"); + value += tb.rstrip(); + } + else if (key == "handle_traceback") + { + str tb = str("").join(_pymod_traceback.attr("format_list")(value)); + value = str("Handle created at (most recent call last):\n"); + value += tb.rstrip(); + } + else + { + value = str(value.attr("__str__")()); + } + std::ostringstream stringStream; + stringStream << key << ": " << value; + log_lines.append(str(stringStream.str())); + } + list args; + dict kwargs; + args.append(str("\n").join(log_lines)); + kwargs["exc_info"] = exc_info; + _pymod_logger.attr("error")(tuple(args), **kwargs); +} + +void event_loop::call_exception_handler(object context) +{ + if (_exception_handler == object()) + { + try + { + default_exception_handler(context); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else + { + PyErr_Clear(); + list args; + dict kwargs; + args.append(str("Exception in default exception handler")); + kwargs["exc_info"] = true; + _pymod_logger.attr("error")(tuple(args), **kwargs); + } + } + } + else + { + try + { + _exception_handler(context); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else + { + PyObject *ptype, *pvalue, *ptraceback; + PyErr_Fetch(&ptype, &pvalue, &ptraceback); + PyErr_NormalizeException(&ptype, &pvalue, &ptraceback); + object type(handle<>(ptype)); + object value(handle<>(pvalue)); + object traceback(handle<>(ptraceback)); + try + { + dict tmp_dict; + tmp_dict["message"] = str("Unhandled error in exception handler"); + tmp_dict["exception"] = value; + tmp_dict["context"] = context; + default_exception_handler(tmp_dict); + } + catch (const error_already_set& e) + { + if (PyErr_ExceptionMatches(PyExc_SystemExit) + || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) + { + // raise + } + else + { + boost::python::list args; + boost::python::dict kwargs; + args.append(str("Exception in default exception handler")); + kwargs["exc_info"] = true; + _pymod_logger.attr("error")(tuple(args), **kwargs); + } + } + } + } + } } + }}} \ No newline at end of file From d23977feb29f7befcb8f50493c8dd02ad47e0911 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sat, 10 Jul 2021 10:44:20 +0000 Subject: [PATCH 06/15] change event_loop constructor parameter from io_context to strand --- include/boost/python/eventloop.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 4debd30b7e..0ccc9a650c 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -18,8 +18,8 @@ namespace boost { namespace python { namespace asio { class event_loop { public: - event_loop(boost::asio::io_context& ctx): - _strand{ctx}, _created_time{std::chrono::steady_clock::now()} + event_loop(boost::asio::io_context::strand& strand): + _strand{strand}, _created_time{std::chrono::steady_clock::now()} { try { From 587c1ea7324fc825e26e881cacc8be8761fb93f7 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sat, 10 Jul 2021 11:20:41 +0000 Subject: [PATCH 07/15] change std::future to python concurrent.futures.Future --- include/boost/python/eventloop.hpp | 9 ++- src/eventloop.cpp | 116 +++++++++++++---------------- 2 files changed, 58 insertions(+), 67 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 0ccc9a650c..c768e1c4f7 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -80,17 +80,17 @@ class event_loop object sock_recv(object sock, size_t nbytes); - size_t sock_recv_into(object sock, object buffer); + object sock_recv_into(object sock, object buffer); object sock_sendall(object sock, object data); - void sock_connect(object sock, object address); + object sock_connect(object sock, object address); object sock_accept(object sock); - void sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true); + object sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true); - void start_tls(object transport, object protocol, object sslcontext, + object start_tls(object transport, object protocol, object sslcontext, bool server_side = false, object server_hostname = object(), object ssl_handshake_timeout = object()); @@ -123,6 +123,7 @@ class event_loop object _pymod_socket = import("socket"); object _pymod_traceback = import("traceback"); object _pymod_logger = import("asyncio.log").attr("logger"); + object _pymod_concurrent_future = import("concurrent").attr("futures"); object _exception_handler = object(); boost::asio::io_context::strand _strand; std::unordered_map> _id_to_timer_map; diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 9335f731a4..298e0b6ba4 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -27,24 +27,27 @@ bool _hasattr(object o, const char* name) return PyObject_HasAttrString(o.ptr(), name); } -void _sock_recv_handler( - std::promise>& prom_data, - std::promise& prom_nbytes_read, - size_t nbytes, - int fd) +void _sock_recv_handler(object fut, size_t nbytes, int fd) { std::vector buffer(nbytes); - prom_nbytes_read.set_value(read(fd, buffer.data(), nbytes)); - prom_data.set_value(std::move(buffer)); + read(fd, buffer.data(), nbytes); + fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes)))); } -void _sock_send_handler(std::promise& prom, int fd, const char *py_str, ssize_t len) +void _sock_recv_into_handler(object fut, size_t nbytes, int fd) { - size_t nwrite = write(fd, py_str, len); - prom.set_value(nwrite); + std::vector buffer(nbytes); + ssize_t nbytes_read = read(fd, buffer.data(), nbytes); + fut.attr("set_result")(nbytes_read); +} + +void _sock_send_handler(object fut, int fd, const char *py_str, ssize_t len) +{ + write(fd, py_str, len); + fut.attr("set_result")(object()); } -void _sock_connect_cb(object pymod_socket, std::promise& prom, std::future& fut, object sock, object addr) +void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr) { try { @@ -55,7 +58,7 @@ void _sock_connect_cb(object pymod_socket, std::promise& prom, std::future PyErr_SetString(PyExc_OSError, "Connect call failed {address}"); throw_error_already_set(); } - prom.set_value(); + fut.attr("set_result")(object()); } catch (const error_already_set& e) { @@ -73,12 +76,12 @@ void _sock_connect_cb(object pymod_socket, std::promise& prom, std::future else { PyErr_Clear(); - prom.set_exception(std::current_exception()); + fut.attr("set_exception")(std::current_exception()); } } } -void _sock_accept(event_loop& loop, std::promise& prom, std::future& fut, object sock) +void _sock_accept(event_loop& loop, object fut, object sock) { int fd = extract(sock.attr("fileno")()); object conn; @@ -89,7 +92,7 @@ void _sock_accept(event_loop& loop, std::promise& prom, std::future& prom, std::future())); } else if (PyErr_ExceptionMatches(PyExc_SystemExit) @@ -109,22 +112,22 @@ void _sock_accept(event_loop& loop, std::promise& prom, std::future& prom, +void _getaddrinfo_handler(object pymod_socket, object fut, object host, int port, int family, int type, int proto, int flags) { object res = pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); - prom.set_value(res); + fut.attr("set_result")(res); } -void _getnameinfo_handler(object pymod_socket, std::promise& prom, object sockaddr, int flags) +void _getnameinfo_handler(object pymod_socket, object fut, object sockaddr, int flags) { object res = pymod_socket.attr("getnameinfo")(sockaddr, flags); - prom.set_value(res); + fut.attr("set_result")(res); } } @@ -193,29 +196,20 @@ void event_loop::call_at(double when, object f) object event_loop::sock_recv(object sock, size_t nbytes) { int fd = extract(sock.attr("fileno")()); - std::promise> prom_data; - std::future> fut_data = prom_data.get_future(); - std::promise prom_nbytes_read; - std::future fut_nbytes_read = prom_nbytes_read.get_future(); - add_reader(fd, make_function(bind(_sock_recv_handler, - boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd), + object fut = _pymod_concurrent_future.attr("Future")(); + add_reader(fd, make_function(bind(_sock_recv_handler, fut, nbytes, fd), default_call_policies(), boost::mpl::vector())); - return object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes))); + return fut; } -size_t event_loop::sock_recv_into(object sock, object buffer) +object event_loop::sock_recv_into(object sock, object buffer) { int fd = extract(sock.attr("fileno")()); ssize_t nbytes = len(buffer); - std::promise> prom_data; - std::future> fut_data = prom_data.get_future(); - std::promise prom_nbytes_read; - std::future fut_nbytes_read = prom_nbytes_read.get_future(); - add_reader(fd, make_function(bind(_sock_recv_handler, - boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd), + object fut = _pymod_concurrent_future.attr("Future")(); + add_reader(fd, make_function(bind(_sock_recv_into_handler, fut, nbytes, fd), default_call_policies(), boost::mpl::vector())); - buffer = object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes))); - return fut_nbytes_read.get(); + return fut; } object event_loop::sock_sendall(object sock, object data) @@ -223,28 +217,25 @@ object event_loop::sock_sendall(object sock, object data) int fd = extract(sock.attr("fileno")()); char const* py_str = extract(data.attr("decode")()); ssize_t py_str_len = len(data); - std::promise prom; - std::future fut = prom.get_future(); - add_writer(fd, make_function(bind(_sock_send_handler, std::ref(prom), fd, py_str, py_str_len), + object fut = _pymod_concurrent_future.attr("Future")(); + add_writer(fd, make_function(bind(_sock_send_handler, fut, fd, py_str, py_str_len), default_call_policies(), boost::mpl::vector())); - fut.wait(); - return object(); + return fut; } -void event_loop::sock_connect(object sock, object address) +object event_loop::sock_connect(object sock, object address) { if (!_hasattr(_pymod_socket, "AF_UNIX") || sock.attr("family") != _pymod_socket.attr("AF_UNIX")) { // TODO: _ensure_resolve } - std::promise prom; - std::future fut = prom.get_future(); + object fut = _pymod_concurrent_future.attr("Future")(); int fd = extract(sock.attr("fileno")()); try { sock.attr("connect")(address); - prom.set_value(); + fut.attr("set_result")(object()); } catch (const error_already_set& e) { @@ -253,7 +244,7 @@ void event_loop::sock_connect(object sock, object address) { PyErr_Clear(); add_writer(fd, make_function(bind( - _sock_connect_cb, _pymod_socket, boost::ref(prom), boost::ref(fut), sock, address), + _sock_connect_cb, _pymod_socket, fut, sock, address), default_call_policies(), boost::mpl::vector())); } else if (PyErr_ExceptionMatches(PyExc_SystemExit) @@ -264,55 +255,54 @@ void event_loop::sock_connect(object sock, object address) else { PyErr_Clear(); - prom.set_exception(std::current_exception()); + fut.attr("set_exception")(std::current_exception()); } } - fut.wait(); + return fut; } object event_loop::sock_accept(object sock) { - std::promise prom; - std::future fut = prom.get_future(); - _sock_accept(*this, prom, fut, sock); - return fut.get(); + object fut = _pymod_concurrent_future.attr("Future")(); + _sock_accept(*this, fut, sock); + return fut; } // TODO: implement this -void event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback) +object event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback) { PyErr_SetString(PyExc_NotImplementedError, "Not implemented!"); throw_error_already_set(); + return object(); } // TODO: implement this -void event_loop::start_tls(object transport, object protocol, object sslcontext, +object event_loop::start_tls(object transport, object protocol, object sslcontext, bool server_side, object server_hostname, object ssl_handshake_timeout) { PyErr_SetString(PyExc_NotImplementedError, "Not implemented!"); throw_error_already_set(); + return object(); } object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags) { - std::promise prom; - std::future fut = prom.get_future(); + object fut = _pymod_concurrent_future.attr("Future")(); call_soon(make_function( - bind(_getaddrinfo_handler, _pymod_socket, boost::ref(prom), host, port, family, type, proto, flags), + bind(_getaddrinfo_handler, _pymod_socket, fut, host, port, family, type, proto, flags), default_call_policies(), boost::mpl::vector())); - return fut.get(); + return fut; } object event_loop::getnameinfo(object sockaddr, int flags) { - std::promise prom; - std::future fut = prom.get_future(); + object fut = _pymod_concurrent_future.attr("Future")(); call_soon(make_function( - bind(_getnameinfo_handler, _pymod_socket, boost::ref(prom), sockaddr, flags), + bind(_getnameinfo_handler, _pymod_socket, fut, sockaddr, flags), default_call_policies(), boost::mpl::vector())); - return fut.get(); + return fut; } void event_loop::default_exception_handler(object context) From 914a18da73e06d4ec120e3687bc810644dd63901 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 11 Jul 2021 07:23:15 +0000 Subject: [PATCH 08/15] remove _id_to_timer_map; change sock_recv_handlers to lambdas --- include/boost/python/eventloop.hpp | 6 +- src/eventloop.cpp | 113 ++++++++++++----------------- 2 files changed, 48 insertions(+), 71 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index c768e1c4f7..58b9e7bf3b 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -37,9 +37,7 @@ class event_loop // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. inline void call_soon(object f) { - _strand.post([f, loop=this] { - f(boost::ref(*loop)); - }); + _strand.post([f, loop=this] {f();}); return; } @@ -118,7 +116,6 @@ class event_loop void call_exception_handler(object context); private: - int64_t _timer_id = 0; object _pymod_ssl = object(); object _pymod_socket = import("socket"); object _pymod_traceback = import("traceback"); @@ -126,7 +123,6 @@ class event_loop object _pymod_concurrent_future = import("concurrent").attr("futures"); object _exception_handler = object(); boost::asio::io_context::strand _strand; - std::unordered_map> _id_to_timer_map; // read: key = fd * 2 + 0, write: key = fd * 2 + 1 std::unordered_map> _descriptor_map; std::chrono::steady_clock::time_point _created_time; diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 298e0b6ba4..913506b46d 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -27,26 +27,6 @@ bool _hasattr(object o, const char* name) return PyObject_HasAttrString(o.ptr(), name); } -void _sock_recv_handler(object fut, size_t nbytes, int fd) -{ - std::vector buffer(nbytes); - read(fd, buffer.data(), nbytes); - fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes)))); -} - -void _sock_recv_into_handler(object fut, size_t nbytes, int fd) -{ - std::vector buffer(nbytes); - ssize_t nbytes_read = read(fd, buffer.data(), nbytes); - fut.attr("set_result")(nbytes_read); -} - -void _sock_send_handler(object fut, int fd, const char *py_str, ssize_t len) -{ - write(fd, py_str, len); - fut.attr("set_result")(object()); -} - void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr) { try @@ -117,19 +97,6 @@ void _sock_accept(event_loop& loop, object fut, object sock) } } -void _getaddrinfo_handler(object pymod_socket, object fut, - object host, int port, int family, int type, int proto, int flags) -{ - object res = pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); - fut.attr("set_result")(res); -} - -void _getnameinfo_handler(object pymod_socket, object fut, object sockaddr, int flags) -{ - object res = pymod_socket.attr("getnameinfo")(sockaddr, flags); - fut.attr("set_result")(res); -} - } void event_loop::_add_reader_or_writer(int fd, object f, int key) @@ -169,20 +136,11 @@ void event_loop::_remove_reader_or_writer(int key) void event_loop::call_later(double delay, object f) { - // add timer - _id_to_timer_map.emplace(_timer_id, - std::move(std::make_unique(_strand.context(), - std::chrono::steady_clock::now() + std::chrono::nanoseconds(int64_t(delay * 1e9)))) - ); - - _id_to_timer_map.find(_timer_id)->second->async_wait( - // remove timer - boost::asio::bind_executor(_strand, [id=_timer_id, f, loop=this] (const boost::system::error_code& ec) - { - loop->_id_to_timer_map.erase(id); - loop->call_soon(f); - })); - _timer_id++; + auto p_timer = std::make_shared(_strand.context(), + std::chrono::nanoseconds(int64_t(delay * 1e9))); + p_timer->async_wait(boost::asio::bind_executor( + _strand, + [f, p_timer, this] (const boost::system::error_code& ec) {f();})); } void event_loop::call_at(double when, object f) @@ -196,31 +154,48 @@ void event_loop::call_at(double when, object f) object event_loop::sock_recv(object sock, size_t nbytes) { int fd = extract(sock.attr("fileno")()); - object fut = _pymod_concurrent_future.attr("Future")(); - add_reader(fd, make_function(bind(_sock_recv_handler, fut, nbytes, fd), + int fd_dup = dup(fd); + object py_fut = _pymod_concurrent_future.attr("Future")(); + add_reader(fd_dup, make_function( + [py_fut, nbytes, fd=fd_dup] (object obj) { + std::vector buffer(nbytes); + read(fd, buffer.data(), nbytes); + py_fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes)))); + }, default_call_policies(), boost::mpl::vector())); - return fut; + return py_fut; } object event_loop::sock_recv_into(object sock, object buffer) { int fd = extract(sock.attr("fileno")()); + int fd_dup = dup(fd); ssize_t nbytes = len(buffer); - object fut = _pymod_concurrent_future.attr("Future")(); - add_reader(fd, make_function(bind(_sock_recv_into_handler, fut, nbytes, fd), + object py_fut = _pymod_concurrent_future.attr("Future")(); + add_reader(fd_dup, make_function( + [py_fut, nbytes, fd=fd_dup] (object obj) { + std::vector buffer(nbytes); + ssize_t nbytes_read = read(fd, buffer.data(), nbytes); + py_fut.attr("set_result")(nbytes_read); + }, default_call_policies(), boost::mpl::vector())); - return fut; + return py_fut; } object event_loop::sock_sendall(object sock, object data) { int fd = extract(sock.attr("fileno")()); + int fd_dup = dup(fd); char const* py_str = extract(data.attr("decode")()); ssize_t py_str_len = len(data); - object fut = _pymod_concurrent_future.attr("Future")(); - add_writer(fd, make_function(bind(_sock_send_handler, fut, fd, py_str, py_str_len), + object py_fut = _pymod_concurrent_future.attr("Future")(); + add_writer(fd_dup, make_function( + [py_fut, fd, py_str, py_str_len] (object obj) { + write(fd, py_str, py_str_len); + py_fut.attr("set_result")(object()); + }, default_call_policies(), boost::mpl::vector())); - return fut; + return py_fut; } object event_loop::sock_connect(object sock, object address) @@ -243,7 +218,7 @@ object event_loop::sock_connect(object sock, object address) || PyErr_ExceptionMatches(PyExc_InterruptedError)) { PyErr_Clear(); - add_writer(fd, make_function(bind( + add_writer(dup(fd), make_function(bind( _sock_connect_cb, _pymod_socket, fut, sock, address), default_call_policies(), boost::mpl::vector())); } @@ -287,22 +262,28 @@ object event_loop::start_tls(object transport, object protocol, object sslcontex object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags) { - object fut = _pymod_concurrent_future.attr("Future")(); + object py_fut = _pymod_concurrent_future.attr("Future")(); call_soon(make_function( - bind(_getaddrinfo_handler, _pymod_socket, fut, host, port, family, type, proto, flags), + [this, py_fut, host, port, family, type, proto, flags] (object obj) { + object res = _pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); + py_fut.attr("set_result")(res); + }, default_call_policies(), boost::mpl::vector())); - return fut; + return py_fut; } object event_loop::getnameinfo(object sockaddr, int flags) { - object fut = _pymod_concurrent_future.attr("Future")(); + object py_fut = _pymod_concurrent_future.attr("Future")(); call_soon(make_function( - bind(_getnameinfo_handler, _pymod_socket, fut, sockaddr, flags), + [this, py_fut, sockaddr, flags] (object obj) { + object res = _pymod_socket.attr("getnameinfo")(sockaddr, flags); + py_fut.attr("set_result")(res); + }, default_call_policies(), boost::mpl::vector())); - return fut; + return py_fut; } void event_loop::default_exception_handler(object context) @@ -411,9 +392,9 @@ void event_loop::call_exception_handler(object context) PyObject *ptype, *pvalue, *ptraceback; PyErr_Fetch(&ptype, &pvalue, &ptraceback); PyErr_NormalizeException(&ptype, &pvalue, &ptraceback); - object type(handle<>(ptype)); - object value(handle<>(pvalue)); - object traceback(handle<>(ptraceback)); + object type{handle<>(ptype)}; + object value{handle<>(pvalue)}; + object traceback{handle<>(ptraceback)}; try { dict tmp_dict; From 570a387dab3db4860b1625634ddda378ebffa292 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 11 Jul 2021 09:45:44 +0000 Subject: [PATCH 09/15] replace make_function with lambda --- include/boost/python/eventloop.hpp | 48 +++++++++--- src/eventloop.cpp | 122 +++++++++-------------------- 2 files changed, 78 insertions(+), 92 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 58b9e7bf3b..a9c813ff51 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -37,8 +37,7 @@ class event_loop // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. inline void call_soon(object f) { - _strand.post([f, loop=this] {f();}); - return; + _strand.post([f]{f();}); } // TODO: implement this @@ -57,22 +56,22 @@ class event_loop inline void add_reader(int fd, object f) { - _add_reader_or_writer(fd, f, fd * 2); + _async_wait_fd(fd, f, _read_key(fd)); } inline void remove_reader(int fd) { - _remove_reader_or_writer(fd * 2); + _descriptor_map.erase(_read_key(fd)); } inline void add_writer(int fd, object f) { - _add_reader_or_writer(fd, f, fd * 2 + 1); + _async_wait_fd(fd, f, _write_key(fd)); } inline void remove_writer(int fd) { - _remove_reader_or_writer(fd * 2 + 1); + _descriptor_map.erase(_write_key(fd)); } @@ -119,7 +118,8 @@ class event_loop object _pymod_ssl = object(); object _pymod_socket = import("socket"); object _pymod_traceback = import("traceback"); - object _pymod_logger = import("asyncio.log").attr("logger"); + object _py_wrap_future = import("asyncio").attr("wrap_future"); + object _py_logger = import("asyncio.log").attr("logger"); object _pymod_concurrent_future = import("concurrent").attr("futures"); object _exception_handler = object(); boost::asio::io_context::strand _strand; @@ -127,8 +127,38 @@ class event_loop std::unordered_map> _descriptor_map; std::chrono::steady_clock::time_point _created_time; - void _add_reader_or_writer(int fd, object f, int key); - void _remove_reader_or_writer(int key); + inline int _read_key(int fd) + { + return fd * 2; + } + + inline int _write_key(int fd) + { + return fd * 2 + 1; + } + + template + void _async_wait_fd(int fd, F f, int key) + { + // add descriptor + if (_descriptor_map.find(key) == _descriptor_map.end()) + { + _descriptor_map.emplace(key, + std::move(std::make_unique(_strand.context(), fd)) + ); + } + + _descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, + boost::asio::bind_executor(_strand, [this, key, f] (const boost::system::error_code& ec) + { + _descriptor_map.erase(key); + f(); + })); + return; + } + + static void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr); + static void _sock_accept(event_loop& loop, object fut, object sock); }; }}} diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 913506b46d..3bfeb9aa2f 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -27,7 +27,9 @@ bool _hasattr(object o, const char* name) return PyObject_HasAttrString(o.ptr(), name); } -void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr) +} + +void event_loop::_sock_connect_cb(object pymod_socket, object fut, object sock, object addr) { try { @@ -61,11 +63,10 @@ void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr) } } -void _sock_accept(event_loop& loop, object fut, object sock) +void event_loop::_sock_accept(event_loop& loop, object fut, object sock) { int fd = extract(sock.attr("fileno")()); - object conn; - object address; + object conn, address; try { object ret = sock.attr("accept")(); @@ -80,9 +81,7 @@ void _sock_accept(event_loop& loop, object fut, object sock) || PyErr_ExceptionMatches(PyExc_InterruptedError)) { PyErr_Clear(); - loop.add_reader(fd, make_function(bind( - _sock_accept, boost::ref(loop), fut, sock), - default_call_policies(), boost::mpl::vector())); + loop._async_wait_fd(fd, bind(_sock_accept, boost::ref(loop), fut, sock), loop._write_key(fd)); } else if (PyErr_ExceptionMatches(PyExc_SystemExit) || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) @@ -94,43 +93,6 @@ void _sock_accept(event_loop& loop, object fut, object sock) PyErr_Clear(); fut.attr("set_exception")(std::current_exception()); } - } -} - -} - -void event_loop::_add_reader_or_writer(int fd, object f, int key) -{ - // add descriptor - if (_descriptor_map.find(key) == _descriptor_map.end()) - { - _descriptor_map.emplace(key, - std::move(std::make_unique(_strand.context(), fd)) - ); - } - - _descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, - boost::asio::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec) - { - // move descriptor - auto iter = loop->_descriptor_map.find(key); - if (iter != loop->_descriptor_map.end()) - { - iter->second->release(); - loop->_descriptor_map.erase(iter); - } - loop->call_soon(f); - })); - return; -} - -void event_loop::_remove_reader_or_writer(int key) -{ - auto iter = _descriptor_map.find(key); - if (iter != _descriptor_map.end()) - { - iter->second->release(); - _descriptor_map.erase(iter); } } @@ -155,14 +117,14 @@ object event_loop::sock_recv(object sock, size_t nbytes) { int fd = extract(sock.attr("fileno")()); int fd_dup = dup(fd); - object py_fut = _pymod_concurrent_future.attr("Future")(); - add_reader(fd_dup, make_function( - [py_fut, nbytes, fd=fd_dup] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _async_wait_fd(fd_dup, + [py_fut, nbytes, fd=fd_dup] { std::vector buffer(nbytes); read(fd, buffer.data(), nbytes); py_fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes)))); }, - default_call_policies(), boost::mpl::vector())); + _read_key(fd)); return py_fut; } @@ -171,14 +133,14 @@ object event_loop::sock_recv_into(object sock, object buffer) int fd = extract(sock.attr("fileno")()); int fd_dup = dup(fd); ssize_t nbytes = len(buffer); - object py_fut = _pymod_concurrent_future.attr("Future")(); - add_reader(fd_dup, make_function( - [py_fut, nbytes, fd=fd_dup] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _async_wait_fd(fd_dup, + [py_fut, nbytes, fd=fd_dup] { std::vector buffer(nbytes); ssize_t nbytes_read = read(fd, buffer.data(), nbytes); py_fut.attr("set_result")(nbytes_read); - }, - default_call_policies(), boost::mpl::vector())); + }, + _read_key(fd)); return py_fut; } @@ -188,13 +150,13 @@ object event_loop::sock_sendall(object sock, object data) int fd_dup = dup(fd); char const* py_str = extract(data.attr("decode")()); ssize_t py_str_len = len(data); - object py_fut = _pymod_concurrent_future.attr("Future")(); - add_writer(fd_dup, make_function( - [py_fut, fd, py_str, py_str_len] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _async_wait_fd(fd_dup, + [py_fut, fd, py_str, py_str_len] { write(fd, py_str, py_str_len); py_fut.attr("set_result")(object()); - }, - default_call_policies(), boost::mpl::vector())); + }, + _write_key(fd)); return py_fut; } @@ -205,12 +167,12 @@ object event_loop::sock_connect(object sock, object address) { // TODO: _ensure_resolve } - object fut = _pymod_concurrent_future.attr("Future")(); + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); int fd = extract(sock.attr("fileno")()); try { sock.attr("connect")(address); - fut.attr("set_result")(object()); + py_fut.attr("set_result")(object()); } catch (const error_already_set& e) { @@ -218,9 +180,7 @@ object event_loop::sock_connect(object sock, object address) || PyErr_ExceptionMatches(PyExc_InterruptedError)) { PyErr_Clear(); - add_writer(dup(fd), make_function(bind( - _sock_connect_cb, _pymod_socket, fut, sock, address), - default_call_policies(), boost::mpl::vector())); + _async_wait_fd(dup(fd), bind(_sock_connect_cb, _pymod_socket, py_fut, sock, address), _write_key(fd)); } else if (PyErr_ExceptionMatches(PyExc_SystemExit) || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) @@ -230,17 +190,17 @@ object event_loop::sock_connect(object sock, object address) else { PyErr_Clear(); - fut.attr("set_exception")(std::current_exception()); + py_fut.attr("set_exception")(std::current_exception()); } } - return fut; + return py_fut; } object event_loop::sock_accept(object sock) { - object fut = _pymod_concurrent_future.attr("Future")(); - _sock_accept(*this, fut, sock); - return fut; + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _sock_accept(*this, py_fut, sock); + return py_fut; } // TODO: implement this @@ -262,27 +222,23 @@ object event_loop::start_tls(object transport, object protocol, object sslcontex object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags) { - object py_fut = _pymod_concurrent_future.attr("Future")(); - call_soon(make_function( - [this, py_fut, host, port, family, type, proto, flags] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _strand.post( + [this, py_fut, host, port, family, type, proto, flags] { object res = _pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); py_fut.attr("set_result")(res); - }, - default_call_policies(), - boost::mpl::vector())); + }); return py_fut; } object event_loop::getnameinfo(object sockaddr, int flags) { - object py_fut = _pymod_concurrent_future.attr("Future")(); - call_soon(make_function( - [this, py_fut, sockaddr, flags] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _strand.post( + [this, py_fut, sockaddr, flags] { object res = _pymod_socket.attr("getnameinfo")(sockaddr, flags); py_fut.attr("set_result")(res); - }, - default_call_policies(), - boost::mpl::vector())); + }); return py_fut; } @@ -345,7 +301,7 @@ void event_loop::default_exception_handler(object context) dict kwargs; args.append(str("\n").join(log_lines)); kwargs["exc_info"] = exc_info; - _pymod_logger.attr("error")(tuple(args), **kwargs); + _py_logger.attr("error")(tuple(args), **kwargs); } void event_loop::call_exception_handler(object context) @@ -370,7 +326,7 @@ void event_loop::call_exception_handler(object context) dict kwargs; args.append(str("Exception in default exception handler")); kwargs["exc_info"] = true; - _pymod_logger.attr("error")(tuple(args), **kwargs); + _py_logger.attr("error")(tuple(args), **kwargs); } } } @@ -416,7 +372,7 @@ void event_loop::call_exception_handler(object context) boost::python::dict kwargs; args.append(str("Exception in default exception handler")); kwargs["exc_info"] = true; - _pymod_logger.attr("error")(tuple(args), **kwargs); + _py_logger.attr("error")(tuple(args), **kwargs); } } } From 53666c6c3070a5ec790d3bb731dbfc3c141157dc Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 11 Jul 2021 13:38:18 +0000 Subject: [PATCH 10/15] replace call_later call in call_at with other code --- src/eventloop.cpp | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 3bfeb9aa2f..3cae57f509 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -98,10 +98,10 @@ void event_loop::_sock_accept(event_loop& loop, object fut, object sock) void event_loop::call_later(double delay, object f) { - auto p_timer = std::make_shared(_strand.context(), - std::chrono::nanoseconds(int64_t(delay * 1e9))); - p_timer->async_wait(boost::asio::bind_executor( - _strand, + auto p_timer = std::make_shared( + _strand.context(), + std::chrono::nanoseconds(int64_t(delay * 1e9))); + p_timer->async_wait(boost::asio::bind_executor(_strand, [f, p_timer, this] (const boost::system::error_code& ec) {f();})); } @@ -109,8 +109,15 @@ void event_loop::call_at(double when, object f) { double diff = when - time(); if (diff > 0) - return call_later(diff, f); - return call_soon(f); + { + auto p_timer = std::make_shared( + _strand.context(), + std::chrono::nanoseconds(int64_t(diff * 1e9))); + p_timer->async_wait(boost::asio::bind_executor(_strand, + [f, p_timer, this] (const boost::system::error_code& ec) {f();})); + return; + } + call_soon(f); } object event_loop::sock_recv(object sock, size_t nbytes) From a9efac6d664f2cafc69930c9984dfeeb3fd1f02c Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 11 Jul 2021 17:24:17 +0000 Subject: [PATCH 11/15] fix issues - namespace comment hint - dup() errno check - fix call_at and time format issue --- include/boost/python/eventloop.hpp | 8 +++--- src/eventloop.cpp | 41 +++++++++++++++++++----------- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index a9c813ff51..2ab10774d1 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -18,8 +18,7 @@ namespace boost { namespace python { namespace asio { class event_loop { public: - event_loop(boost::asio::io_context::strand& strand): - _strand{strand}, _created_time{std::chrono::steady_clock::now()} + event_loop(const boost::asio::io_context::strand& strand): _strand{strand} { try { @@ -51,7 +50,7 @@ class event_loop inline double time() { - return static_cast>(std::chrono::steady_clock::now() - _created_time).count(); + return std::chrono::steady_clock::now().time_since_epoch().count(); } inline void add_reader(int fd, object f) @@ -125,7 +124,6 @@ class event_loop boost::asio::io_context::strand _strand; // read: key = fd * 2 + 0, write: key = fd * 2 + 1 std::unordered_map> _descriptor_map; - std::chrono::steady_clock::time_point _created_time; inline int _read_key(int fd) { @@ -161,7 +159,7 @@ class event_loop static void _sock_accept(event_loop& loop, object fut, object sock); }; -}}} +}}} // namespace boost::python # endif diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 3cae57f509..7ae6d838d6 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -9,6 +9,7 @@ // 3. _ensure_fd_no_transport // 4. _ensure_resolve +#include #include #include #include @@ -27,8 +28,14 @@ bool _hasattr(object o, const char* name) return PyObject_HasAttrString(o.ptr(), name); } +void raise_dup_error() +{ + PyErr_SetString(PyExc_OSError, std::system_category().message(errno).c_str()); + throw_error_already_set(); } +} // namespace + void event_loop::_sock_connect_cb(object pymod_socket, object fut, object sock, object addr) { try @@ -100,30 +107,27 @@ void event_loop::call_later(double delay, object f) { auto p_timer = std::make_shared( _strand.context(), - std::chrono::nanoseconds(int64_t(delay * 1e9))); + std::chrono::duration_cast(std::chrono::duration(delay))); p_timer->async_wait(boost::asio::bind_executor(_strand, - [f, p_timer, this] (const boost::system::error_code& ec) {f();})); + [f, p_timer] (const boost::system::error_code& ec) {f();})); } void event_loop::call_at(double when, object f) { - double diff = when - time(); - if (diff > 0) - { - auto p_timer = std::make_shared( - _strand.context(), - std::chrono::nanoseconds(int64_t(diff * 1e9))); - p_timer->async_wait(boost::asio::bind_executor(_strand, - [f, p_timer, this] (const boost::system::error_code& ec) {f();})); - return; - } - call_soon(f); + using sc = std::chrono::steady_clock; + auto p_timer = std::make_shared( + _strand.context(), + sc::duration(static_cast(when))); + p_timer->async_wait(boost::asio::bind_executor(_strand, + [f, p_timer] (const boost::system::error_code& ec) {f();})); } object event_loop::sock_recv(object sock, size_t nbytes) { int fd = extract(sock.attr("fileno")()); int fd_dup = dup(fd); + if (fd_dup == -1) + raise_dup_error(); object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); _async_wait_fd(fd_dup, [py_fut, nbytes, fd=fd_dup] { @@ -139,6 +143,8 @@ object event_loop::sock_recv_into(object sock, object buffer) { int fd = extract(sock.attr("fileno")()); int fd_dup = dup(fd); + if (fd_dup == -1) + raise_dup_error(); ssize_t nbytes = len(buffer); object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); _async_wait_fd(fd_dup, @@ -155,6 +161,8 @@ object event_loop::sock_sendall(object sock, object data) { int fd = extract(sock.attr("fileno")()); int fd_dup = dup(fd); + if (fd_dup == -1) + raise_dup_error(); char const* py_str = extract(data.attr("decode")()); ssize_t py_str_len = len(data); object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); @@ -187,7 +195,10 @@ object event_loop::sock_connect(object sock, object address) || PyErr_ExceptionMatches(PyExc_InterruptedError)) { PyErr_Clear(); - _async_wait_fd(dup(fd), bind(_sock_connect_cb, _pymod_socket, py_fut, sock, address), _write_key(fd)); + int fd_dup = dup(fd); + if (fd_dup == -1) + raise_dup_error(); + _async_wait_fd(fd_dup, bind(_sock_connect_cb, _pymod_socket, py_fut, sock, address), _write_key(fd)); } else if (PyErr_ExceptionMatches(PyExc_SystemExit) || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) @@ -388,4 +399,4 @@ void event_loop::call_exception_handler(object context) } -}}} \ No newline at end of file +}}} // namespace boost::python \ No newline at end of file From afcccbc1f77eb23114e578eed14c8bfce56609f3 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 8 Aug 2021 12:53:45 +0000 Subject: [PATCH 12/15] add interface to set default eventloop --- include/boost/python/eventloop.hpp | 4 ++- src/eventloop.cpp | 47 ++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 2ab10774d1..f708483a23 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace boost { namespace python { namespace asio { @@ -159,7 +160,8 @@ class event_loop static void _sock_accept(event_loop& loop, object fut, object sock); }; -}}} // namespace boost::python +void set_default_event_loop(const boost::asio::io_context::strand& strand); +}}} // namespace boost::python # endif diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 7ae6d838d6..e99f4e42b9 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -398,5 +398,52 @@ void event_loop::call_exception_handler(object context) } } +void set_default_event_loop(const boost::asio::io_context::strand& strand) +{ + class_("BoostAsioEventLoop", init()) + .def("call_soon", &event_loop::call_soon) + .def("call_soon_thread_safe", &event_loop::call_soon_thread_safe) + .def("call_later", &event_loop::call_later) + .def("call_at", &event_loop::call_at) + .def("time", &event_loop::time) + .def("add_reader", &event_loop::add_reader) + .def("remove_reader", &event_loop::remove_reader) + .def("add_writer", &event_loop::add_writer) + .def("remove_writer", &event_loop::remove_writer) + .def("sock_recv", &event_loop::sock_recv) + .def("sock_recv_into", &event_loop::sock_recv_into) + .def("sock_sendall", &event_loop::sock_sendall) + .def("sock_connect", &event_loop::sock_connect) + .def("sock_accept", &event_loop::sock_accept) + .def("sock_sendfile", &event_loop::sock_sendfile) + .def("start_tls", &event_loop::start_tls) + .def("getaddrinfo", &event_loop::getaddrinfo) + .def("getnameinfo", &event_loop::getnameinfo) + .def("set_exception_handler", &event_loop::set_exception_handler) + .def("get_exception_handler", &event_loop::get_exception_handler) + .def("default_exception_handler", &event_loop::default_exception_handler) + .def("call_exception_handler", &event_loop::call_exception_handler); + + object asyncio = import("asyncio"); + object abstract_policy = asyncio.attr("AbstractEventLoopPolicy"); + + dict method_dict; + std::shared_ptr p_loop = std::make_shared(strand); + + method_dict["get_event_loop"] = make_function( + [p_loop] (object e) {return object(boost::ref(*p_loop));}, + default_call_policies(), + boost::mpl::vector() + ); + + object class_boost_policy = call( + (PyObject*)&PyType_Type, + str("BoostEventLoopPolicy"), + boost::python::make_tuple(abstract_policy), + method_dict); + + object boost_policy_instance = class_boost_policy.attr("__call__")(); + asyncio.attr("set_event_loop_policy")(boost_policy_instance); +} }}} // namespace boost::python \ No newline at end of file From e4ec199e16c7e6fd8d011fa72d8e012797294e7d Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Mon, 9 Aug 2021 19:46:04 +0000 Subject: [PATCH 13/15] add event_loop doc --- doc/reference/eventloop.qbk | 95 +++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 doc/reference/eventloop.qbk diff --git a/doc/reference/eventloop.qbk b/doc/reference/eventloop.qbk new file mode 100644 index 0000000000..42f1ba5145 --- /dev/null +++ b/doc/reference/eventloop.qbk @@ -0,0 +1,95 @@ +[chapter Event Loop + [quickbook 1.7] +] + +[section boost/python/eventloop.hpp] + +[section Introduction] +Provide a Boost.Asio-based implementation for the Python [@https://docs.python.org/3/library/asyncio-eventloop.html `EventLoop`] type. Every callback is scheduled in strand. +[endsect] + +[section Function `set_default_event_loop`] +`` +void set_default_event_loop(const boost::asio::io_context::strand& strand); +`` +[variablelist +[[Effect][construct an `event_loop` object using provided [@https://www.boost.org/doc/libs/1_76_0/doc/html/boost_asio/overview/core/strands.html `strand`] object. Setup a new [@https://docs.python.org/3/library/asyncio-policy.html event loop policy], when user call `get_event_loop` using that policy, it returns the Boost Asio `event_loop` object]] +[[Throws][nothing]] +] +[endsect] + +[section Function `PyInit_boost_event_loop`] +`` +extern "C" +{ + PyObject* PyInit_boost_event_loop(); +} +`` +[variablelist +[[Effect][user must call `PyImport_AppendInittab("boost_event_loop", &PyInit_boost_event_loop);` before [@https://docs.python.org/3/c-api/init.html#c.Py_Initialize `Py_Initialize`]]] +[[Throws][nothing]] +] +[endsect] + +[section Example] +`` +// example.cpp +io_context ctx; +io_context::strand st{ctx}; + +PyImport_AppendInittab("boost_event_loop", &PyInit_boost_event_loop); +Py_Initialize(); +set_default_event_loop(st); + +object py_module = import("example.py"); +py_module.attr("hello_world")(); +st.context().run(); + +// example.py +import asyncio +def hello_world(): + print("hello world") + +def call_event_loop(): + loop = asyncio.get_event_loop_policy().get_event_loop() + loop.call_soon(hello_world) +`` +Note: `set_default_event_loop` must be called before any Python module is imported. Otherwise it may result in the module-level variables registered against the default asyncio eventloop instead the boost asio eventloop. Here is an example demonstrating the issue. +`` +// bad_import.cpp +Py_Initialize(); +import("example_module"); // example_module is initialized +set_default_event_loop(st); // boost_asio_eventloop is set to default, but the example_module.lock was registered against the old eventloop + +// example_module.py +import asyncio +lock = asyncio.Lock() +`` +[endsect] + +[section Event Loop and Multiple Python Sub-interpreters] +It's allowed to have multiple Python sub-interpreter instances in a same program. Each interpreter will act as a guest VM, and C++ host will schedule all the asynchronous events committed by the Python VM.[br] +The Python interpreter must outlive the [@https://www.boost.org/doc/libs/1_76_0/doc/html/boost_asio/reference/io_service.html `asio::io_context`] objects it owns. It's not safe to destroy the interpreter midways.[br] +[endsect] + +[section [@https://docs.python.org/3/c-api/init.html#thread-state-and-the-global-interpreter-lock `GIL`]] +`boost::asio::io_context::run()` may be called from multiple threads and the completion handlers may wakeup on different threads as well. The GIL must be released after setting up the Python IO object and before the call to `boost::asio::io_context::run()`. In the completion handler, the GIL must be reacquired and release before it calls into Python to deliver the result. +`` +// gil.cpp +py_func(); // call func imported from gil.py +PyEval_ReleaseLock(); // release GIL +ctx.run(); + +// gil.py +import asyncio + +def hello(): + print("hello_world") + +def func(): + loop = asyncio.get_event_loop_policy().get_event_loop() + loop.call_soon(hello) +`` +[endsect] + +[endsect] \ No newline at end of file From a2e1238a4934ac92e4443cd3351735b7fb0ebdc9 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sat, 21 Aug 2021 19:54:15 +0000 Subject: [PATCH 14/15] add create_future --- include/boost/python/eventloop.hpp | 12 ++++++++++-- src/eventloop.cpp | 25 +++++++++++++++++-------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index f708483a23..65b4a0e9fe 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -114,13 +114,21 @@ class event_loop void call_exception_handler(object context); + object create_future(); + + // TODO + inline bool get_debug() + { + return false; + } + private: object _pymod_ssl = object(); object _pymod_socket = import("socket"); object _pymod_traceback = import("traceback"); - object _py_wrap_future = import("asyncio").attr("wrap_future"); + object _pymod_asyncio_futures = import("asyncio").attr("futures"); object _py_logger = import("asyncio.log").attr("logger"); - object _pymod_concurrent_future = import("concurrent").attr("futures"); + object _pymod_concurrent_futures = import("concurrent").attr("futures"); object _exception_handler = object(); boost::asio::io_context::strand _strand; // read: key = fd * 2 + 0, write: key = fd * 2 + 1 diff --git a/src/eventloop.cpp b/src/eventloop.cpp index e99f4e42b9..a18af5fbcc 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -128,7 +128,7 @@ object event_loop::sock_recv(object sock, size_t nbytes) int fd_dup = dup(fd); if (fd_dup == -1) raise_dup_error(); - object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + object py_fut = create_future(); _async_wait_fd(fd_dup, [py_fut, nbytes, fd=fd_dup] { std::vector buffer(nbytes); @@ -146,7 +146,7 @@ object event_loop::sock_recv_into(object sock, object buffer) if (fd_dup == -1) raise_dup_error(); ssize_t nbytes = len(buffer); - object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + object py_fut = create_future(); _async_wait_fd(fd_dup, [py_fut, nbytes, fd=fd_dup] { std::vector buffer(nbytes); @@ -165,7 +165,7 @@ object event_loop::sock_sendall(object sock, object data) raise_dup_error(); char const* py_str = extract(data.attr("decode")()); ssize_t py_str_len = len(data); - object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + object py_fut = create_future(); _async_wait_fd(fd_dup, [py_fut, fd, py_str, py_str_len] { write(fd, py_str, py_str_len); @@ -182,7 +182,7 @@ object event_loop::sock_connect(object sock, object address) { // TODO: _ensure_resolve } - object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + object py_fut = create_future(); int fd = extract(sock.attr("fileno")()); try { @@ -216,7 +216,7 @@ object event_loop::sock_connect(object sock, object address) object event_loop::sock_accept(object sock) { - object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + object py_fut = create_future(); _sock_accept(*this, py_fut, sock); return py_fut; } @@ -240,7 +240,7 @@ object event_loop::start_tls(object transport, object protocol, object sslcontex object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags) { - object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + object py_fut = create_future(); _strand.post( [this, py_fut, host, port, family, type, proto, flags] { object res = _pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); @@ -251,7 +251,7 @@ object event_loop::getaddrinfo(object host, int port, int family, int type, int object event_loop::getnameinfo(object sockaddr, int flags) { - object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + object py_fut = create_future(); _strand.post( [this, py_fut, sockaddr, flags] { object res = _pymod_socket.attr("getnameinfo")(sockaddr, flags); @@ -398,6 +398,13 @@ void event_loop::call_exception_handler(object context) } } +object event_loop::create_future() +{ + boost::python::dict kwargs; + kwargs["loop"] = boost::ref(*this); + return _pymod_asyncio_futures.attr("Future")(*boost::python::tuple(), **kwargs); +} + void set_default_event_loop(const boost::asio::io_context::strand& strand) { class_("BoostAsioEventLoop", init()) @@ -422,7 +429,9 @@ void set_default_event_loop(const boost::asio::io_context::strand& strand) .def("set_exception_handler", &event_loop::set_exception_handler) .def("get_exception_handler", &event_loop::get_exception_handler) .def("default_exception_handler", &event_loop::default_exception_handler) - .def("call_exception_handler", &event_loop::call_exception_handler); + .def("call_exception_handler", &event_loop::call_exception_handler) + .def("create_future", &event_loop::create_future) + .def("get_debug", &event_loop::get_debug); object asyncio = import("asyncio"); object abstract_policy = asyncio.attr("AbstractEventLoopPolicy"); From 1d3200213d2bd72c16af2843befdede4c9cedbc0 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 22 Aug 2021 13:35:05 +0000 Subject: [PATCH 15/15] fix multiple issues change post([f]{f()}) to post(f) fix closing namespace comment add TODO for windows socket remove iostream include add GIL release and acuiqre in completion handlers --- include/boost/python/eventloop.hpp | 8 ++++++-- src/eventloop.cpp | 23 ++++++++++++++++++++--- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 65b4a0e9fe..6fadbe409b 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -37,7 +37,11 @@ class event_loop // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. inline void call_soon(object f) { - _strand.post([f]{f();}); + _strand.post([f]{ + PyEval_AcquireLock(); + f(); + PyEval_ReleaseLock(); + }); } // TODO: implement this @@ -170,6 +174,6 @@ class event_loop void set_default_event_loop(const boost::asio::io_context::strand& strand); -}}} // namespace boost::python +}}} // namespace boost::python::asio # endif diff --git a/src/eventloop.cpp b/src/eventloop.cpp index a18af5fbcc..47bface908 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -10,7 +10,6 @@ // 4. _ensure_resolve #include -#include #include #include #include @@ -109,7 +108,11 @@ void event_loop::call_later(double delay, object f) _strand.context(), std::chrono::duration_cast(std::chrono::duration(delay))); p_timer->async_wait(boost::asio::bind_executor(_strand, - [f, p_timer] (const boost::system::error_code& ec) {f();})); + [f, p_timer] (const boost::system::error_code& ec) { + PyEval_AcquireLock(); + f(); + PyEval_ReleaseLock(); + })); } void event_loop::call_at(double when, object f) @@ -122,6 +125,7 @@ void event_loop::call_at(double when, object f) [f, p_timer] (const boost::system::error_code& ec) {f();})); } +// TODO: support windows socket object event_loop::sock_recv(object sock, size_t nbytes) { int fd = extract(sock.attr("fileno")()); @@ -131,14 +135,17 @@ object event_loop::sock_recv(object sock, size_t nbytes) object py_fut = create_future(); _async_wait_fd(fd_dup, [py_fut, nbytes, fd=fd_dup] { + PyEval_AcquireLock(); std::vector buffer(nbytes); read(fd, buffer.data(), nbytes); py_fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes)))); + PyEval_ReleaseLock(); }, _read_key(fd)); return py_fut; } +// TODO: support windows socket object event_loop::sock_recv_into(object sock, object buffer) { int fd = extract(sock.attr("fileno")()); @@ -149,14 +156,17 @@ object event_loop::sock_recv_into(object sock, object buffer) object py_fut = create_future(); _async_wait_fd(fd_dup, [py_fut, nbytes, fd=fd_dup] { + PyEval_AcquireLock(); std::vector buffer(nbytes); ssize_t nbytes_read = read(fd, buffer.data(), nbytes); py_fut.attr("set_result")(nbytes_read); + PyEval_ReleaseLock(); }, _read_key(fd)); return py_fut; } +// TODO: support windows socket object event_loop::sock_sendall(object sock, object data) { int fd = extract(sock.attr("fileno")()); @@ -168,13 +178,16 @@ object event_loop::sock_sendall(object sock, object data) object py_fut = create_future(); _async_wait_fd(fd_dup, [py_fut, fd, py_str, py_str_len] { + PyEval_AcquireLock(); write(fd, py_str, py_str_len); py_fut.attr("set_result")(object()); + PyEval_ReleaseLock(); }, _write_key(fd)); return py_fut; } +// TODO: support windows socket object event_loop::sock_connect(object sock, object address) { @@ -243,8 +256,10 @@ object event_loop::getaddrinfo(object host, int port, int family, int type, int object py_fut = create_future(); _strand.post( [this, py_fut, host, port, family, type, proto, flags] { + PyEval_AcquireLock(); object res = _pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); py_fut.attr("set_result")(res); + PyEval_ReleaseLock(); }); return py_fut; } @@ -254,8 +269,10 @@ object event_loop::getnameinfo(object sockaddr, int flags) object py_fut = create_future(); _strand.post( [this, py_fut, sockaddr, flags] { + PyEval_AcquireLock(); object res = _pymod_socket.attr("getnameinfo")(sockaddr, flags); py_fut.attr("set_result")(res); + PyEval_ReleaseLock(); }); return py_fut; } @@ -455,4 +472,4 @@ void set_default_event_loop(const boost::asio::io_context::strand& strand) asyncio.attr("set_event_loop_policy")(boost_policy_instance); } -}}} // namespace boost::python \ No newline at end of file +}}} // namespace boost::python::asio \ No newline at end of file