diff --git a/README.md b/README.md index 56ff47a..9ea2997 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ While not directly supported by pycapnp, a tool has been created to help generat ## Python Versions -Python 3.7+ is supported. +Python 3.8+ is supported. ## Development diff --git a/capnp/lib/capnp.pyx b/capnp/lib/capnp.pyx index dbb92b7..ba74a75 100644 --- a/capnp/lib/capnp.pyx +++ b/capnp/lib/capnp.pyx @@ -438,8 +438,10 @@ cdef class _DynamicListReader: cdef class _DynamicResizableListBuilder: """Class for building growable Cap'n Proto Lists - .. warning:: You need to call :meth:`finish` on this object before serializing the Cap'n Proto message. - Failure to do so will cause your objects not to be written out as well as leaking orphan structs into your message. + .. warning:: + You need to call :meth:`finish` on this object before serializing the + Cap'n Proto message. Failure to do so will cause your objects not to be + written out as well as leaking orphan structs into your message. This class works much like :class:`_DynamicListBuilder`, but it allows growing the list dynamically. It is meant for lists of structs, since for primitive types like int or float, you're much better off @@ -1436,9 +1438,11 @@ cdef class _DynamicStructBuilder: This is only meant for lists of Cap'n Proto objects, since for primitive types you can just define a normal python list and fill it yourself. - .. warning:: You need to call :meth:`_DynamicResizableListBuilder.finish` on the - list object before serializing the Cap'n Proto message. Failure to do so will cause - your objects not to be written out as well as leaking orphan structs into your message. + .. warning:: + You need to call :meth:`_DynamicResizableListBuilder.finish` on the + list object before serializing the Cap'n Proto message. Failure to do + so will cause your objects not to be written out as well as leaking + orphan structs into your message. :type field: str :param field: The field name to initialize @@ -1850,6 +1854,16 @@ cdef class _EventLoop: @_asynccontextmanager async def kj_loop(): + """Context manager for running the KJ event loop + + As long as the context manager is active it is guaranteed that the KJ event + loop is running. When the context manager is exited, the KJ event loop is + shut down properly and pending tasks are cancelled. + + :raises [RuntimeError]: If the KJ event loop is already running (on this thread). + + .. warning:: Every capnp rpc call required a running KJ event loop. + """ asyncio_loop = asyncio.get_running_loop() if hasattr(asyncio_loop, '_kj_loop'): raise RuntimeError("The KJ event-loop is already running (on this thread).") @@ -1876,6 +1890,12 @@ async def kj_loop(): kj_loop.close() async def run(coro): + """Ensure that the coroutine runs while the KJ event loop is running + + This is a shortcut for wrapping the coroutine in a :py:meth:`capnp.kj_loop` context manager. + + :param coro: Coroutine to run + """ async with kj_loop(): return await coro @@ -4391,7 +4411,7 @@ def add_import_hook(additional_paths=[]): :type additional_paths: list :param additional_paths: Additional paths, listed as strings, to be used to search for the .capnp files. - It is prepended to the beginning of sys.path. It also affects imports inside of Cap'n Proto schemas. + It is prepended to the beginning of sys.path. It also affects imports inside of Cap'n Proto schemas. """ global _importer if _importer is not None: diff --git a/docs/capnp.rst b/docs/capnp.rst index fda8688..0060542 100644 --- a/docs/capnp.rst +++ b/docs/capnp.rst @@ -14,36 +14,6 @@ Classes RPC ~~~ -Promise -####### - -.. autoclass:: Promise - :members: - :undoc-members: - :inherited-members: - -Promise may be one of: - -* :meth:`capnp.lib.capnp._Promise` -* :meth:`capnp.lib.capnp._RemotePromise` -* :meth:`capnp.lib.capnp._VoidPromise` - -.. autoclass:: capnp.lib.capnp._Promise - :members: - :undoc-members: - :inherited-members: - -.. autoclass:: capnp.lib.capnp._RemotePromise - :members: - :undoc-members: - :inherited-members: - -.. autoclass:: capnp.lib.capnp._VoidPromise - :members: - :undoc-members: - :inherited-members: - - Communication ############# @@ -96,16 +66,14 @@ Miscellaneous Functions --------- .. autofunction:: add_import_hook +.. autofunction:: remove_import_hook .. autofunction:: cleanup_global_schema_parser -.. autofunction:: create_event_loop -.. autofunction:: getTimer -.. autofunction:: join_promises + +.. autofunction:: kj_loop +.. autofunction:: run + .. autofunction:: load -.. autofunction:: poll_once -.. autofunction:: remove_event_loop -.. autofunction:: remove_import_hook -.. autofunction:: reset_event_loop -.. autofunction:: wait_forever + Internal Classes diff --git a/docs/conf.py b/docs/conf.py index 7e29d6f..e2c7711 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -307,6 +307,6 @@ # Allow duplicate toc entries. # epub_tocdup = True -intersphinx_mapping = {"http://docs.python.org/": None} +intersphinx_mapping = {"": ("http://docs.python.org/", None)} smv_branch_whitelist = r"^master$" diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 132d22b..e7d7a10 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -65,7 +65,7 @@ Const values ~~~~~~~~~~~~ Const values show up just as you'd expect under the loaded schema. For example:: - print addressbook_capnp.qux + print(addressbook_capnp.qux) # 123 @@ -139,7 +139,7 @@ If you assign an invalid value to one, you will get a ValueError:: --------------------------------------------------------------------------- ValueError Traceback (most recent call last) ... - ValueError: src/capnp/schema.c++:326: requirement not met: enum has no such enumerant; name = foo + AttributeError: capnp/schema.c++:566: failed: enum has no such enumerant; name = foo Unions @@ -157,10 +157,11 @@ Also, one weird case is for Void types in Unions (and in general, but Void is re .. note:: One caveat for unions is having structs as union members. Let us assume `employment.school` was actually a struct with a field of type `Text` called `name`:: alice.employment.school.name = "MIT" - # Raises a ValueError + # Raises a KjException The problem is that a struct within a union isn't initialized automatically. You have to do the following:: + TODO Broken school = alice.employment.init('school') school.name = "MIT" @@ -334,136 +335,87 @@ Cap'n Proto has a rich RPC protocol. You should read the `RPC specification `_. Please refer to it to understand the interfaces that will be used. -Asyncio support was added to pycapnp in v1.0.0 utilizing the TwoWayPipe interface to libcapnp (instead of having libcapnp control the socket communication). The main advantage here is that standard Python socket libraries can be used with pycapnp (more importantly, TLS/SSL). Asyncio requires a bit more boiler plate to get started but it does allow for a lot more control than using the pycapnp socket wrapper. +Asyncio support was added to pycapnp in v1.0.0. Since v2.0.0 the usage of asyncio is mandatory for rpc calls. This allows for a more robust and flexible rpc implementation. +KJ Event loop +^^^^^^^^^^^^^ -Client -~~~~~~ +Cap'n Proto uses the KJ event loop for its RPC implementation. Pycapnp handles all the required mapping between the asyncio event loop and the KJ event loop. +To ensure proper creation, usage and cleanup of the KJ event loop is controlled through a context manager called `kj_loop`. All RPC calls must be made within the context manager. -There are two ways to start a client: libcapnp socket wrapper and asyncio. -The wrapper is easier to implement but is very limited (doesn't support SSL/TLS with Python). -asyncio requires more setup and can be harder to debug; however, it does support SSL/TLS and has more control over the socket error conditions. asyncio also helps get around the threading limitations around the current pycapnp implementation has with libcapnp (pycapnp objects and functions must all be in the same thread). + import capnp + import asyncio + async def main(): + async with capnp.kj_loop(): + # RPC calls here + + asyncio.run(main()) -Starting a Client -################# -Starting a client is very easy:: +To simplify the usage the helper function `capnp.run` can be used to execute a asyncio coroutine within the context manager. import capnp - import calculator_capnp + import asyncio - client = capnp.TwoPartyClient('localhost:60000') + async def main(): + # RPC calls here -.. note:: You can also pass a raw socket with a `fileno()` method to TwoPartyClient -.. note:: This will not work with SSL/TLS, please see :ref:`rpc-asyncio-client` + asyncio.run(capnp.run(main())) +Client +~~~~~~ .. _rpc-asyncio-client: -Starting a Client (asyncio) -########################### -Asyncio takes a bit more boilerplate than using the socket wrapper, but it gives you a lot more control. The example here is very simplistic. Here's an example of full error handling (with reconnection on server failure): `hidio client `_. - -At a basic level, asyncio splits the input and output streams of the tcp socket and sends it to the libcapnp TwoWayPipe interface. An async reader Python function/method is used to consume the incoming byte stream and an async writer Python function/method is used to write outgoing bytes to the socket. - -.. note:: You'll need to be using the async keyword on some of the Python function/methods. If you're unsure, look at the full `example code `_. Also, read up on recent Python asyncio tutorials if you're new to the concept. Make sure the tutorial is 3.7+, asyncio changed a lot from when it was first introduced in 3.4. - -First you'll need two basic async functions:: - - async def myreader(client, reader): - while True: - data = await reader.read(4096) - client.write(data) - - - async def mywriter(client, writer): - while True: - data = await client.read(4096) - writer.write(data.tobytes()) - await writer.drain() - -.. note:: There's no socket error handling here, so this won't be sufficient for anything beyond a simple example. +Starting a Client +################# -Next you'll need to define an async function that sets up the socket connection. This is equivalent to `client = capnp.TwoPartyClient('localhost:60000')` in the earlier example:: +The first step is to open the socket to the server. This is a For now this needs to be done +through `capnp.AsyncIoStream.create_connection`. Its a thin wrapper around `asyncio.get_running_loop().create_connection()` +that adds all required Protocol handling. - async def main(host): - addr = 'localhost' + async def main(): + host = 'localhost' port = '6000' + connection = await capnp.AsyncIoStream.create_connection(host=host, port=port) + + asyncio.run(capnp.run(main())) - # Handle both IPv4 and IPv6 cases - try: - print("Try IPv4") - reader, writer = await asyncio.open_connection( - addr, port, - family=socket.AF_INET - ) - except Exception: - print("Try IPv6") - reader, writer = await asyncio.open_connection( - addr, port, - family=socket.AF_INET6 - ) - - # Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode) - client = capnp.TwoPartyClient() - - # Assemble reader and writer tasks, run in the background - coroutines = [myreader(client, reader), mywriter(client, writer)] - asyncio.gather(*coroutines, return_exceptions=True) +In the next step this created connection can be passed to `capnp.TwoPartyClient` to create the client object. + async def main(): + host = 'localhost' + port = '6000' + connection = await capnp.AsyncIoStream.create_connection(host=host, port=port) + client = capnp.TwoPartyClient(connection) ## Bootstrap Here ## -.. note:: On systems that have both IPv4 and IPv6 addresses, IPv6 is often resolved first and needs to be handled separately. If you're certain IPv6 won't be used, you can remove it (you should also avoid localhost, and stick to something like 127.0.0.1). - -Finally, you'll need to start the asyncio function:: - - if __name__ == '__main__': - asyncio.run(main(parse_args().host)) - -.. note:: This is the simplest way to start asyncio and usually not sufficient for most applications. + asyncio.run(capnp.run(main())) SSL/TLS Client ^^^^^^^^^^^^^^ -SSL/TLS setup effectively wraps the socket transport. You'll need an SSL certificate, for this example we'll be using a self-signed certificate. Most of the asyncio setup is the same as above:: +SSL/TLS setup effectively wraps the socket transport. You'll need an SSL certificate, for this example we'll be using a self-signed certificate. +Since we wrap around the asyncio connection interface the SSL/TLS setup is done through the `ssl` parameter of `create_connection`. - async def main(host): - addr = 'localhost' + async def main(): + host = 'localhost' port = '6000' - # Setup SSL context - ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=os.path.join(this_dir, 'selfsigned.cert')) - - # Handle both IPv4 and IPv6 cases - try: - print("Try IPv4") - reader, writer = await asyncio.open_connection( - addr, port, - ssl=ctx, - family=socket.AF_INET - ) - except Exception: - print("Try IPv6") - reader, writer = await asyncio.open_connection( - addr, port, - ssl=ctx, - family=socket.AF_INET6 - ) - - # Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode) - client = capnp.TwoPartyClient() - - # Assemble reader and writer tasks, run in the background - coroutines = [myreader(client, reader), mywriter(client, writer)] - asyncio.gather(*coroutines, return_exceptions=True) + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=os.path.join(this_dir, "selfsigned.cert")) + connection = await capnp.AsyncIoStream.create_connection(host=host, port=port, ssl=ctx, family=socket.AF_INET) + client = capnp.TwoPartyClient(connection) ## Bootstrap Here ## -Due to a `bug `_ in Python 3.7 and 3.8 asyncio client needs to be initialized in a slightly different way:: + asyncio.run(capnp.run(main())) + + +Due to a `bug `_ in Python 3.8 asyncio client needs to be initialized in a slightly different way:: if __name__ == '__main__': loop = asyncio.get_event_loop() - loop.run_until_complete(main(parse_args().host)) + loop.run_until_complete(capnp.run(main(parse_args().host))) Bootstrap @@ -491,141 +443,43 @@ The shorter syntax for calling methods is:: The major shortcoming with this method is that expressing complex fields with many nested sub-structs can become very tedious. -Once you have a promise, there are 2 ways of getting to the result. The first is to wait for it:: - - result = eval_promise.wait() - -The second is to build a promise chain by calling `then`:: +The returned promise can be handled like any other asyncio promise:: - def do_stuff(val): - ... - - eval_promise.then(do_stuff).wait() + result = await eval_promise() Pipelining ########## -If a method returns values that are themselves capabilites, then you can access these fields before having to call `wait`. Doing this is called pipelining, and it allows Cap'n Proto to chain the calls without a round-trip occurring to the server:: +If a method returns values that are themselves capabilites, then you can access these fields before awaiting the promise. Doing this is called pipelining, and it allows Cap'n Proto to chain the calls without a round-trip occurring to the server:: # evaluate returns `value` which is itself an interface. # You can call a new method on `value` without having to call wait first read_promise = eval_promise.value.read() - read_result = read_promise.wait() # only 1 wait call - -You can also chain promises with `then` and the same pipelining will occur:: - - read_result = eval_promise.then(lambda ret: ret.value.read()).wait() - + read_result = await read_promise # only 1 await call Server ~~~~~~ -There are two ways to start a server: libcapnp socket wrapper and asyncio. -The wrapper is easier to implement but is very limited (doesn't support SSL/TLS with Python). -asyncio requires more setup and can be harder to debug; however, it does support SSL/TLS and has more control over the socket error conditions. asyncio also helps get around the threading limitations around the current pycapnp implementation has with libcapnp (pycapnp objects and functions must all be in the same thread). The asyncio Server is a bit more work to implement than an asyncio client as more error handling is required to deal with client connection/disconnection/timeout events. - Starting a Server -################# - -To start a server:: - - server = capnp.TwoPartyServer('*:60000', bootstrap=CalculatorImpl()) - server.run_forever() - -.. note:: You can also pass a raw socket with a `fileno()` method to TwoPartyServer. In that case, `run_forever` will not work, and you will have to use `on_disconnect.wait()`. -.. note:: This will not work with SSL/TLS, please see :ref:`rpc-asyncio-server` +########################### +Just like the asyncio client uses a asyncio client, the server uses a +asyncio server that can be created wit `capnp.AsyncIoStream.create_server`. -.. _rpc-asyncio-server: +We need an additional `new_connection()` method in the `Server` class to handle each of the incoming socket connections:: -Starting a Server (asyncio) -########################### -Like the asyncio client, an asyncio server takes a bunch of boilerplate as opposed to using the socket wrapper. Servers generally have to handle a lot more error conditions than clients so they are generally more complicated to implement with asyncio. - -Just like the asyncio client, both the input and output socket streams are handled by reader/writer callback functions/methods. - -.. note:: You'll need to be using the async keyword on some of the Python function/methods. If you're unsure, look at the full `example code `_. Also, read up on recent Python asyncio tutorials if you're new to the concept. Make sure the tutorial is 3.7+, asyncio changed a lot from when it was first introduced in 3.4. - -To simplify the callbacks use a server class to define the reader/writer callbacks.:: - - class Server: - async def myreader(self): - while self.retry and not self.reader.at_eof(): - try: - data = await self.reader.read(4096) - await self.server.write(data) - except Exception as err: - print("Unknown myreader err: %s", err) - return False - print("myreader done.") - return True - - async def mywriter(self): - while self.retry: - try: - data = await self.server.read(4096) - self.writer.write(data.tobytes()) - except Exception as err: - print("Unknown mywriter err: %s", err) - return False - print("mywriter done.") - return True - -We need an additional `myserver()` method in the `Server` class to handle each of the incoming socket connections:: - - async def myserver(self, reader, writer): - # Start TwoPartyServer using TwoWayPipe (only requires bootstrap) - self.server = capnp.TwoPartyServer(bootstrap=CalculatorImpl()) - self.reader = reader - self.writer = writer - self.retry = True - - # Assemble reader and writer tasks, run in the background - coroutines = [self.myreader(), self.mywriter()] - tasks = asyncio.gather(*coroutines, return_exceptions=True) - - while True: - self.server.poll_once() - # Check to see if reader has been sent an eof (disconnect) - if self.reader.at_eof(): - self.retry = False - break - await asyncio.sleep(0.01) - - # Make wait for reader/writer to finish (prevent possible resource leaks) - await tasks - -Finally, we'll need to start an asyncio server to spawn a new async `myserver()` with it's own `Server()` object for each new connection:: - - async def new_connection(reader, writer): - server = Server() - await server.myserver(reader, writer) + async def new_connection(stream): + await capnp.TwoPartyServer(stream, bootstrap=CalculatorImpl()).on_disconnect() async def main(): - addr = 'localhost' - port = '60000' - - # Handle both IPv4 and IPv6 cases - try: - print("Try IPv4") - server = await asyncio.start_server( - new_connection, - addr, port, - family=socket.AF_INET - ) - except Exception: - print("Try IPv6") - server = await asyncio.start_server( - new_connection, - addr, port, - family=socket.AF_INET6 - ) - + host = 'localhost' + port = '6000' + server = await capnp.AsyncIoStream.create_server(new_connection, host, port) async with server: await server.serve_forever() - if __name__ == '__main__': - asyncio.run(main()) + if __name__ == "__main__": + asyncio.run(capnp.run(main())) .. note:: On systems that have both IPv4 and IPv6 addresses, IPv6 is often resolved first and needs to be handled separately. If you're certain IPv6 won't be used, you can remove it (you should also avoid localhost, and stick to something like 127.0.0.1). If you're broadcasting in general, you'll probably want to use `0.0.0.0` (IPv4) or `::/0` (IPv6). @@ -634,35 +488,29 @@ SSL/TLS Server ^^^^^^^^^^^^^^ Adding SSL/TLS support for a pycapnp asyncio server is fairly straight-forward. Just create an SSL context before starting the asyncio server:: + async def new_connection(stream): + await capnp.TwoPartyServer(stream, bootstrap=CalculatorImpl()).on_disconnect() + async def main(): - addr = 'localhost' - port = '60000' + host = 'localhost' + port = '6000' # Setup SSL context ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - ctx.load_cert_chain(os.path.join(this_dir, 'selfsigned.cert'), os.path.join(this_dir, 'selfsigned.key')) - - # Handle both IPv4 and IPv6 cases - try: - print("Try IPv4") - server = await asyncio.start_server( - new_connection, - addr, port, - ssl=ctx, - family=socket.AF_INET - ) - except Exception: - print("Try IPv6") - server = await asyncio.start_server( - new_connection, - addr, port, - ssl=ctx, - family=socket.AF_INET6 - ) - + ctx.load_cert_chain( + os.path.join(this_dir, "selfsigned.cert"), + os.path.join(this_dir, "selfsigned.key"), + ) + + server = await capnp.AsyncIoStream.create_server( + new_connection, host, port, ssl=ctx, family=socket.AF_INET + ) async with server: await server.serve_forever() + if __name__ == "__main__": + asyncio.run(capnp.run(main())) + Implementing a Server #####################