Skip to content

Python framework for AsyncAPI-documented Websocket, PubSub, and other async constructs

License

Notifications You must be signed in to change notification settings

noteable-io/asyncapi-eventrouter

asyncapi-eventrouter

Work in Progress

Write Python code for Event-Driven Architectures! The asyncapi-eventrouter prototype library creates Websocket, PubSub, and other asynchronous frameworks with message validation and automatic schema documentation in the AsyncAPI specification. It's heavily inspired by how FastAPI documents REST endpoints in the OpenAPI specification.

Example

What would the Streetlights API look like in Python code with asyncapi-eventrouter?

# asyncapi.py
from pydantic import BaseModel, Field
from datetime import datetime
from asyncapi_eventrouter import Application

asyncapi_app = Application()

class LightMeasured(BaseModel):
    id: int = Field(..., gte=0, description="ID of the streetlight.")
    lumens: int = Field(..., gte=0, description="Light intensity measured in lumens.")
    sentAt: datetime = Field(..., description="Date and time when the message was sent.")

@asyncapi_app.subscribe(channel_name="light/measured",
                        event_name="LightMeasured")
async def record_measurement(measurement: LightMeasured):
    # record to db or take some other action
    return {'received': datetime.now()}
# main.py
from fastapi import FastAPI, WebSocket
from asyncapi import asyncapi_app

app = FastAPI()

@app.websocket('/ws')
async def ws(websocket: Websocket):
    await ws.accept()
    while True:
        content = await websocket.receive_text()
        response = await asyncapi_app.process(content)
        await websocket.send_json(response)

@app.get('/ws-schema')
async def ws_schema():
    return asyncapi_app.schema()
# client.py
import asyncio
import json
from datetime import datetime

import websockets

async def send_measurement():
    data = {"id": 123, "lumens": 42, "sentAt": datetime.now().isoformat()}
    event = {"channel": "light/measured", "event": "LightMeasured", "data": data}
    async with websockets.connect("ws://localhost:8000/ws") as ws:
        await ws.send(json.dumps(event))

if __name__ == '__main__':
    asyncio.run(send_measurement())

Last but not least, you can view the spec by visiting that /ws-schema endpoint or by importing the asyncapi_app and dumping the schema.

from asyncapi import asyncapi_app
import yaml

print(yaml.dump(asyncapi_app.schema())

asyncapi: 2.2.0
channels:
  light/measured:
    publish:
      LightMeasured:
        payload:
          properties:
            id:
              description: ID of the streetlight.
              gte: 0
              title: Id
              type: integer
            lumens:
              description: Light intensity measured in lumens.
              gte: 0
              title: Lumens
              type: integer
            sentAt:
              description: Date and time when the message was sent.
              format: date-time
              title: Sentat
              type: string
          required:
          - id
          - lumens
          - sentAt
          title: LightMeasured
          type: object

Development

This project uses poetry and pre-commit for development.

  1. poetry env use 3.9 will create a .venv directory in your asyncapi-eventrouter directory.
  2. poetry install will install asyncapi-eventrouter and all dependencies into that virtual environment.
  3. pre-commit run --all-files will show you what will be executed any time you git commit.

Skip Pre-commit

If the pre-commit hooks can not be easily resolved, you can still commit using git commit --no-verify.

Testing

This project uses tox and pytest for testing. You'll need to have installed tox on your system Python. To run tests, just execute tox (tox -q for less verbosity, tox -p to run in parallel).

You'll also need a system installation for each Python version you want to test with tox (e.g. python3.7, python3.8, python3.9, python3.10). Installation steps may be different for each OS. On Ubuntu 20.04, it is a matter of adding the deadsnakes repository (sudo add-apt-repository ppa:deadsnakes/ppa) and installing the version you want: sudo apt install python3.10.

Inspiration and gratitude

The useful FastAPI project inspires this project. Any code snippets from FastAPI are given credit and attribution in the source code. We are thankful to the FastAPI community for their work.

About

Python framework for AsyncAPI-documented Websocket, PubSub, and other async constructs

Resources

License

Code of conduct

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages