diff --git a/README.md b/README.md index 56ff47a..9ea2997 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ While not directly supported by pycapnp, a tool has been created to help generat ## Python Versions -Python 3.7+ is supported. +Python 3.8+ is supported. ## Development diff --git a/capnp/lib/capnp.pyx b/capnp/lib/capnp.pyx index dbb92b7..ba74a75 100644 --- a/capnp/lib/capnp.pyx +++ b/capnp/lib/capnp.pyx @@ -438,8 +438,10 @@ cdef class _DynamicListReader: cdef class _DynamicResizableListBuilder: """Class for building growable Cap'n Proto Lists - .. warning:: You need to call :meth:`finish` on this object before serializing the Cap'n Proto message. - Failure to do so will cause your objects not to be written out as well as leaking orphan structs into your message. + .. warning:: + You need to call :meth:`finish` on this object before serializing the + Cap'n Proto message. Failure to do so will cause your objects not to be + written out as well as leaking orphan structs into your message. This class works much like :class:`_DynamicListBuilder`, but it allows growing the list dynamically. It is meant for lists of structs, since for primitive types like int or float, you're much better off @@ -1436,9 +1438,11 @@ cdef class _DynamicStructBuilder: This is only meant for lists of Cap'n Proto objects, since for primitive types you can just define a normal python list and fill it yourself. - .. warning:: You need to call :meth:`_DynamicResizableListBuilder.finish` on the - list object before serializing the Cap'n Proto message. Failure to do so will cause - your objects not to be written out as well as leaking orphan structs into your message. + .. warning:: + You need to call :meth:`_DynamicResizableListBuilder.finish` on the + list object before serializing the Cap'n Proto message. Failure to do + so will cause your objects not to be written out as well as leaking + orphan structs into your message. :type field: str :param field: The field name to initialize @@ -1850,6 +1854,16 @@ cdef class _EventLoop: @_asynccontextmanager async def kj_loop(): + """Context manager for running the KJ event loop + + As long as the context manager is active it is guaranteed that the KJ event + loop is running. When the context manager is exited, the KJ event loop is + shut down properly and pending tasks are cancelled. + + :raises [RuntimeError]: If the KJ event loop is already running (on this thread). + + .. warning:: Every capnp rpc call required a running KJ event loop. + """ asyncio_loop = asyncio.get_running_loop() if hasattr(asyncio_loop, '_kj_loop'): raise RuntimeError("The KJ event-loop is already running (on this thread).") @@ -1876,6 +1890,12 @@ async def kj_loop(): kj_loop.close() async def run(coro): + """Ensure that the coroutine runs while the KJ event loop is running + + This is a shortcut for wrapping the coroutine in a :py:meth:`capnp.kj_loop` context manager. + + :param coro: Coroutine to run + """ async with kj_loop(): return await coro @@ -4391,7 +4411,7 @@ def add_import_hook(additional_paths=[]): :type additional_paths: list :param additional_paths: Additional paths, listed as strings, to be used to search for the .capnp files. - It is prepended to the beginning of sys.path. It also affects imports inside of Cap'n Proto schemas. + It is prepended to the beginning of sys.path. It also affects imports inside of Cap'n Proto schemas. """ global _importer if _importer is not None: diff --git a/docs/capnp.rst b/docs/capnp.rst index fda8688..a4f4f80 100644 --- a/docs/capnp.rst +++ b/docs/capnp.rst @@ -14,50 +14,34 @@ Classes RPC ~~~ -Promise -####### - -.. autoclass:: Promise +.. autoclass:: capnp.lib.capnp._RemotePromise :members: :undoc-members: :inherited-members: -Promise may be one of: - -* :meth:`capnp.lib.capnp._Promise` -* :meth:`capnp.lib.capnp._RemotePromise` -* :meth:`capnp.lib.capnp._VoidPromise` - -.. autoclass:: capnp.lib.capnp._Promise - :members: - :undoc-members: - :inherited-members: +Communication +############# -.. autoclass:: capnp.lib.capnp._RemotePromise +.. autoclass:: TwoPartyClient :members: :undoc-members: :inherited-members: -.. autoclass:: capnp.lib.capnp._VoidPromise +.. autoclass:: TwoPartyServer :members: :undoc-members: :inherited-members: - -Communication -############# - -.. autoclass:: TwoPartyClient +.. autoclass:: AsyncIoStream :members: :undoc-members: :inherited-members: -.. autoclass:: TwoPartyServer +.. autoclass:: capnp.lib.capnp._AsyncIoStream :members: :undoc-members: :inherited-members: - Capability ########## @@ -96,16 +80,14 @@ Miscellaneous Functions --------- .. autofunction:: add_import_hook +.. autofunction:: remove_import_hook .. autofunction:: cleanup_global_schema_parser -.. autofunction:: create_event_loop -.. autofunction:: getTimer -.. autofunction:: join_promises + +.. autofunction:: kj_loop +.. autofunction:: run + .. autofunction:: load -.. autofunction:: poll_once -.. autofunction:: remove_event_loop -.. autofunction:: remove_import_hook -.. autofunction:: reset_event_loop -.. autofunction:: wait_forever + Internal Classes diff --git a/docs/conf.py b/docs/conf.py index 7e29d6f..e2c7711 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -307,6 +307,6 @@ # Allow duplicate toc entries. # epub_tocdup = True -intersphinx_mapping = {"http://docs.python.org/": None} +intersphinx_mapping = {"": ("http://docs.python.org/", None)} smv_branch_whitelist = r"^master$" diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 132d22b..775965a 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -65,7 +65,7 @@ Const values ~~~~~~~~~~~~ Const values show up just as you'd expect under the loaded schema. For example:: - print addressbook_capnp.qux + print(addressbook_capnp.qux) # 123 @@ -137,9 +137,9 @@ If you assign an invalid value to one, you will get a ValueError:: alicePhone.type = 'foo' --------------------------------------------------------------------------- - ValueError Traceback (most recent call last) + KjException Traceback (most recent call last) ... - ValueError: src/capnp/schema.c++:326: requirement not met: enum has no such enumerant; name = foo + AttributeError: capnp/schema.c++:566: failed: enum has no such enumerant; name = foo Unions @@ -157,10 +157,11 @@ Also, one weird case is for Void types in Unions (and in general, but Void is re .. note:: One caveat for unions is having structs as union members. Let us assume `employment.school` was actually a struct with a field of type `Text` called `name`:: alice.employment.school.name = "MIT" - # Raises a ValueError + # Raises a KjException The problem is that a struct within a union isn't initialized automatically. You have to do the following:: + TODO Broken school = alice.employment.init('school') school.name = "MIT" @@ -171,8 +172,8 @@ Writing to a File ~~~~~~~~~~~~~~~~~ Once you're done assigning to all the fields in a message, you can write it to a file like so:: - f = open('example.bin', 'w+b') - addresses.write(f) + with open('example.bin', 'wb') as f: + addresses.write(f) There is also a `write_packed` function, that writes out the message more space-efficientally. If you use write_packed, make sure to use read_packed when reading the message. @@ -184,8 +185,8 @@ Reading from a file ~~~~~~~~~~~~~~~~~~~ Much like before, you will have to de-serialize the message from a file descriptor:: - f = open('example.bin', 'rb') - addresses = addressbook_capnp.AddressBook.read(f) + with open('example.bin', 'rb') as f: + addresses = addressbook_capnp.AddressBook.read(f) Note that this very much needs to match the type you wrote out. In general, you will always be sending the same message types out over a given channel or you should wrap all your types in an unnamed union. Unnamed unions are defined in the .capnp file like so:: @@ -234,19 +235,21 @@ As shown in the examples above, there is file serialization with `write()`:: addresses = addressbook_capnp.AddressBook.new_message() ... - f = open('example.bin', 'w+b') - addresses.write(f) + with open('example.bin', 'wb') as f: + addresses.write(f) And similarly for reading:: - f = open('example.bin', 'rb') - addresses = addressbook_capnp.AddressBook.read(f) + with open('example.bin', 'rb') as f: + addresses = addressbook_capnp.AddressBook.read(f) There are packed versions as well:: - addresses.write_packed(f) - f.seek(0) - addresses = addressbook_capnp.AddressBook.read_packed(f) + with open('example.bin', 'wb') as f: + addresses.write_packed(f) + ... + with open('example.bin', 'rb') as f: + addresses = addressbook_capnp.AddressBook.read_packed(f) Multi-message files @@ -255,14 +258,14 @@ The above methods only guaranteed to work if your file contains a single message addresses = addressbook_capnp.AddressBook.new_message() ... - f = open('example.bin', 'w+b') - addresses.write(f) - addresses.write(f) - addresses.write(f) # write 3 messages - f.seek(0) - - for addresses in addressbook_capnp.AddressBook.read_multiple(f): - print addresses + with open('example.bin', 'wb') as f: + addresses.write(f) + addresses.write(f) + addresses.write(f) # write 3 messages + + with open('example.bin', 'rb') as f: + for addresses in addressbook_capnp.AddressBook.read_multiple(f): + print(addresses) There is also a packed version:: @@ -334,136 +337,90 @@ Cap'n Proto has a rich RPC protocol. You should read the `RPC specification `_. Please refer to it to understand the interfaces that will be used. -Asyncio support was added to pycapnp in v1.0.0 utilizing the TwoWayPipe interface to libcapnp (instead of having libcapnp control the socket communication). The main advantage here is that standard Python socket libraries can be used with pycapnp (more importantly, TLS/SSL). Asyncio requires a bit more boiler plate to get started but it does allow for a lot more control than using the pycapnp socket wrapper. +Asyncio support was added to pycapnp in v1.0.0. Since v2.0.0, the usage of asyncio is mandatory for all RPC calls. This guarantees a more robust and flexible RPC implementation. +KJ Event Loop +~~~~~~~~~~~~~ -Client -~~~~~~ +Cap'n Proto uses the KJ event loop for its RPC implementation. Pycapnp handles all the required mapping between the asyncio event loop and the KJ event loop. +To ensure proper creation, usage, and cleanup of the KJ event loop, a context manager called :py:meth:`capnp.kj_loop` is exposed by pycapnp . All RPC calls must be made within this context:: -There are two ways to start a client: libcapnp socket wrapper and asyncio. -The wrapper is easier to implement but is very limited (doesn't support SSL/TLS with Python). -asyncio requires more setup and can be harder to debug; however, it does support SSL/TLS and has more control over the socket error conditions. asyncio also helps get around the threading limitations around the current pycapnp implementation has with libcapnp (pycapnp objects and functions must all be in the same thread). + import capnp + import asyncio + async def main(): + async with capnp.kj_loop(): + # RPC calls here + + asyncio.run(main()) -Starting a Client -################# -Starting a client is very easy:: +To simplify the usage, the helper function:py:meth:`capnp.run` can execute a asyncio coroutine within the :py:meth:`capnp.kj_loop` context manager:: import capnp - import calculator_capnp + import asyncio - client = capnp.TwoPartyClient('localhost:60000') + async def main(): + # RPC calls here -.. note:: You can also pass a raw socket with a `fileno()` method to TwoPartyClient -.. note:: This will not work with SSL/TLS, please see :ref:`rpc-asyncio-client` + asyncio.run(capnp.run(main())) +Client +~~~~~~ .. _rpc-asyncio-client: -Starting a Client (asyncio) -########################### -Asyncio takes a bit more boilerplate than using the socket wrapper, but it gives you a lot more control. The example here is very simplistic. Here's an example of full error handling (with reconnection on server failure): `hidio client `_. +Thanks to the integration into the asyncio library, most of the boiler plate code is handled by pycapnp directly. The only thing that needs to be done is to create a client object and bootstrap the server capability. -At a basic level, asyncio splits the input and output streams of the tcp socket and sends it to the libcapnp TwoWayPipe interface. An async reader Python function/method is used to consume the incoming byte stream and an async writer Python function/method is used to write outgoing bytes to the socket. - -.. note:: You'll need to be using the async keyword on some of the Python function/methods. If you're unsure, look at the full `example code `_. Also, read up on recent Python asyncio tutorials if you're new to the concept. Make sure the tutorial is 3.7+, asyncio changed a lot from when it was first introduced in 3.4. - -First you'll need two basic async functions:: - - async def myreader(client, reader): - while True: - data = await reader.read(4096) - client.write(data) +Starting a Client +################# +The first step is to open a socket to the server. For now this needs to be done +through :py:meth:`~._AsyncIoStream.create_connection`. A thin wrapper around :py:meth:`asyncio.get_running_loop().create_connection()` +that adds all required Protocol handling:: - async def mywriter(client, writer): - while True: - data = await client.read(4096) - writer.write(data.tobytes()) - await writer.drain() + async def main(): + host = 'localhost' + port = '6000' + connection = await capnp.AsyncIoStream.create_connection(host=host, port=port) + + asyncio.run(capnp.run(main())) -.. note:: There's no socket error handling here, so this won't be sufficient for anything beyond a simple example. +.. note:: :py:meth:`~._AsyncIoStream.create_connection` forwards all calls to the underlying asyncio create_connection function. -Next you'll need to define an async function that sets up the socket connection. This is equivalent to `client = capnp.TwoPartyClient('localhost:60000')` in the earlier example:: +In the next step this created connection can be passed to :py:meth:`capnp.TwoPartyClient` to create the client object:: - async def main(host): - addr = 'localhost' + async def main(): + host = 'localhost' port = '6000' - - # Handle both IPv4 and IPv6 cases - try: - print("Try IPv4") - reader, writer = await asyncio.open_connection( - addr, port, - family=socket.AF_INET - ) - except Exception: - print("Try IPv6") - reader, writer = await asyncio.open_connection( - addr, port, - family=socket.AF_INET6 - ) - - # Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode) - client = capnp.TwoPartyClient() - - # Assemble reader and writer tasks, run in the background - coroutines = [myreader(client, reader), mywriter(client, writer)] - asyncio.gather(*coroutines, return_exceptions=True) - + connection = await capnp.AsyncIoStream.create_connection(host=host, port=port) + client = capnp.TwoPartyClient(connection) ## Bootstrap Here ## -.. note:: On systems that have both IPv4 and IPv6 addresses, IPv6 is often resolved first and needs to be handled separately. If you're certain IPv6 won't be used, you can remove it (you should also avoid localhost, and stick to something like 127.0.0.1). - -Finally, you'll need to start the asyncio function:: - - if __name__ == '__main__': - asyncio.run(main(parse_args().host)) - -.. note:: This is the simplest way to start asyncio and usually not sufficient for most applications. + asyncio.run(capnp.run(main())) SSL/TLS Client ^^^^^^^^^^^^^^ -SSL/TLS setup effectively wraps the socket transport. You'll need an SSL certificate, for this example we'll be using a self-signed certificate. Most of the asyncio setup is the same as above:: +SSL/TLS setup effectively wraps the socket transport. You'll need an SSL certificate, for this example, we'll use a self-signed certificate. Since we wrap around the asyncio connection interface, the SSL/TLS setup is done through the :py:obj:`ssl`` parameter of :py:meth:`~._AsyncIoStream.create_connection`:: - async def main(host): - addr = 'localhost' + async def main(): + host = 'localhost' port = '6000' - # Setup SSL context - ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=os.path.join(this_dir, 'selfsigned.cert')) - - # Handle both IPv4 and IPv6 cases - try: - print("Try IPv4") - reader, writer = await asyncio.open_connection( - addr, port, - ssl=ctx, - family=socket.AF_INET - ) - except Exception: - print("Try IPv6") - reader, writer = await asyncio.open_connection( - addr, port, - ssl=ctx, - family=socket.AF_INET6 - ) - - # Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode) - client = capnp.TwoPartyClient() - - # Assemble reader and writer tasks, run in the background - coroutines = [myreader(client, reader), mywriter(client, writer)] - asyncio.gather(*coroutines, return_exceptions=True) + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=os.path.join(this_dir, "selfsigned.cert")) + connection = await capnp.AsyncIoStream.create_connection(host=host, port=port, ssl=ctx, family=socket.AF_INET) + client = capnp.TwoPartyClient(connection) ## Bootstrap Here ## -Due to a `bug `_ in Python 3.7 and 3.8 asyncio client needs to be initialized in a slightly different way:: + asyncio.run(capnp.run(main())) + + +Due to a `bug `_ in Python 3.8 asyncio client needs to be initialized in a slightly different way:: if __name__ == '__main__': loop = asyncio.get_event_loop() - loop.run_until_complete(main(parse_args().host)) + loop.run_until_complete(capnp.run(main(parse_args().host))) Bootstrap @@ -491,141 +448,51 @@ The shorter syntax for calling methods is:: The major shortcoming with this method is that expressing complex fields with many nested sub-structs can become very tedious. -Once you have a promise, there are 2 ways of getting to the result. The first is to wait for it:: - - result = eval_promise.wait() +The returned promise can be handled like any other asyncio promise:: -The second is to build a promise chain by calling `then`:: - - def do_stuff(val): - ... - - eval_promise.then(do_stuff).wait() + result = await eval_promise() Pipelining ########## -If a method returns values that are themselves capabilites, then you can access these fields before having to call `wait`. Doing this is called pipelining, and it allows Cap'n Proto to chain the calls without a round-trip occurring to the server:: +If a method returns values that are themselves capabilites, then you can access these fields before awaiting the promise. Doing this is called pipelining, and it allows Cap'n Proto to chain the calls without a round-trip occurring to the server:: # evaluate returns `value` which is itself an interface. # You can call a new method on `value` without having to call wait first read_promise = eval_promise.value.read() - read_result = read_promise.wait() # only 1 wait call - -You can also chain promises with `then` and the same pipelining will occur:: - - read_result = eval_promise.then(lambda ret: ret.value.read()).wait() - + read_result = await read_promise # only 1 await call Server ~~~~~~ -There are two ways to start a server: libcapnp socket wrapper and asyncio. -The wrapper is easier to implement but is very limited (doesn't support SSL/TLS with Python). -asyncio requires more setup and can be harder to debug; however, it does support SSL/TLS and has more control over the socket error conditions. asyncio also helps get around the threading limitations around the current pycapnp implementation has with libcapnp (pycapnp objects and functions must all be in the same thread). The asyncio Server is a bit more work to implement than an asyncio client as more error handling is required to deal with client connection/disconnection/timeout events. - Starting a Server -################# - -To start a server:: - - server = capnp.TwoPartyServer('*:60000', bootstrap=CalculatorImpl()) - server.run_forever() +########################### -.. note:: You can also pass a raw socket with a `fileno()` method to TwoPartyServer. In that case, `run_forever` will not work, and you will have to use `on_disconnect.wait()`. -.. note:: This will not work with SSL/TLS, please see :ref:`rpc-asyncio-server` +Like the client, the server uses an asyncio server that can be created with :py:meth:`~._AsyncIoStream.create_server`. +.. note:: :py:meth:`~._AsyncIoStream.create_server`, similar to :py:meth:`~._AsyncIoStream.create_connection`, forwards all arguments to the underlying asyncio create_connection function (with the exception of the first argument). -.. _rpc-asyncio-server: +The first argument to :py:meth:`~._AsyncIoStream.create_server` must be a callback is +used by the pycapnp protocol implementation. The :py:obj:`callback` parameter will be called +whenever a new connection is made. It receives a py:obj:`AsyncIoStream` instance as its +only argument. If the result of py:obj:`callback` is a coroutine, it will be scheduled as a +task. At minimum, the callback should create a :py:class:`capnp.TwoPartyServer` for the +passed stream. :py:class:`capnp.TwoPartyServer` also exposes a +:py:meth:`~.TwoPartyServer.on_disconnect()` function, which can be used as a task to handle +the lifetime properly:: -Starting a Server (asyncio) -########################### -Like the asyncio client, an asyncio server takes a bunch of boilerplate as opposed to using the socket wrapper. Servers generally have to handle a lot more error conditions than clients so they are generally more complicated to implement with asyncio. - -Just like the asyncio client, both the input and output socket streams are handled by reader/writer callback functions/methods. - -.. note:: You'll need to be using the async keyword on some of the Python function/methods. If you're unsure, look at the full `example code `_. Also, read up on recent Python asyncio tutorials if you're new to the concept. Make sure the tutorial is 3.7+, asyncio changed a lot from when it was first introduced in 3.4. - -To simplify the callbacks use a server class to define the reader/writer callbacks.:: - - class Server: - async def myreader(self): - while self.retry and not self.reader.at_eof(): - try: - data = await self.reader.read(4096) - await self.server.write(data) - except Exception as err: - print("Unknown myreader err: %s", err) - return False - print("myreader done.") - return True - - async def mywriter(self): - while self.retry: - try: - data = await self.server.read(4096) - self.writer.write(data.tobytes()) - except Exception as err: - print("Unknown mywriter err: %s", err) - return False - print("mywriter done.") - return True - -We need an additional `myserver()` method in the `Server` class to handle each of the incoming socket connections:: - - async def myserver(self, reader, writer): - # Start TwoPartyServer using TwoWayPipe (only requires bootstrap) - self.server = capnp.TwoPartyServer(bootstrap=CalculatorImpl()) - self.reader = reader - self.writer = writer - self.retry = True - - # Assemble reader and writer tasks, run in the background - coroutines = [self.myreader(), self.mywriter()] - tasks = asyncio.gather(*coroutines, return_exceptions=True) - - while True: - self.server.poll_once() - # Check to see if reader has been sent an eof (disconnect) - if self.reader.at_eof(): - self.retry = False - break - await asyncio.sleep(0.01) - - # Make wait for reader/writer to finish (prevent possible resource leaks) - await tasks - -Finally, we'll need to start an asyncio server to spawn a new async `myserver()` with it's own `Server()` object for each new connection:: - - async def new_connection(reader, writer): - server = Server() - await server.myserver(reader, writer) + async def new_connection(stream): + await capnp.TwoPartyServer(stream, bootstrap=CalculatorImpl()).on_disconnect() async def main(): - addr = 'localhost' - port = '60000' - - # Handle both IPv4 and IPv6 cases - try: - print("Try IPv4") - server = await asyncio.start_server( - new_connection, - addr, port, - family=socket.AF_INET - ) - except Exception: - print("Try IPv6") - server = await asyncio.start_server( - new_connection, - addr, port, - family=socket.AF_INET6 - ) - + host = 'localhost' + port = '6000' + server = await capnp.AsyncIoStream.create_server(new_connection, host, port) async with server: await server.serve_forever() - if __name__ == '__main__': - asyncio.run(main()) + if __name__ == "__main__": + asyncio.run(capnp.run(main())) .. note:: On systems that have both IPv4 and IPv6 addresses, IPv6 is often resolved first and needs to be handled separately. If you're certain IPv6 won't be used, you can remove it (you should also avoid localhost, and stick to something like 127.0.0.1). If you're broadcasting in general, you'll probably want to use `0.0.0.0` (IPv4) or `::/0` (IPv6). @@ -634,35 +501,29 @@ SSL/TLS Server ^^^^^^^^^^^^^^ Adding SSL/TLS support for a pycapnp asyncio server is fairly straight-forward. Just create an SSL context before starting the asyncio server:: + async def new_connection(stream): + await capnp.TwoPartyServer(stream, bootstrap=CalculatorImpl()).on_disconnect() + async def main(): - addr = 'localhost' - port = '60000' + host = 'localhost' + port = '6000' # Setup SSL context ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - ctx.load_cert_chain(os.path.join(this_dir, 'selfsigned.cert'), os.path.join(this_dir, 'selfsigned.key')) - - # Handle both IPv4 and IPv6 cases - try: - print("Try IPv4") - server = await asyncio.start_server( - new_connection, - addr, port, - ssl=ctx, - family=socket.AF_INET - ) - except Exception: - print("Try IPv6") - server = await asyncio.start_server( - new_connection, - addr, port, - ssl=ctx, - family=socket.AF_INET6 - ) - + ctx.load_cert_chain( + os.path.join(this_dir, "selfsigned.cert"), + os.path.join(this_dir, "selfsigned.key"), + ) + + server = await capnp.AsyncIoStream.create_server( + new_connection, host, port, ssl=ctx, family=socket.AF_INET + ) async with server: await server.serve_forever() + if __name__ == "__main__": + asyncio.run(capnp.run(main())) + Implementing a Server #####################