Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add api, distributed setup #135

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open

Conversation

alex-dixon
Copy link
Contributor

@alex-dixon alex-dixon commented Aug 12, 2024

This PR aims to provide a unified architecture for two primary use cases:

  1. A Tensorboard-like experience where ell writes lmps and traces to the local file system and studio visualizes them
  2. A Langsmith-like experience where lmps and studio run on different machines and data is written to arbitrary data stores

To support ell’s features across both, we add a pub sub implementation to enable realtime updates when ell is used with network databases.

A new api server allows the core library to delegate responsibility for writing lmps and traces in a distributed context. This also creates space for ell’s backend to grow, to the extent it may write to multiple databases or other backend services. In this pr, it writes to the configured database and broadcasts its writes to the pub sub implementation.

Studio has been updated to optionally subscribe to these broadcasts and republish them to websocket clients.

If this is ok here would be next steps:

  • Create an ell API client implementation (Python)
  • Primary use case is to allow lmps to make API calls instead of writing to storage directly
  • Use the client in track
    • By default, use an implementation that writes to SQLLite directly as ell does today (Tensorboard/single binary use case)
    • Use the API client implementation if ell is configured with an API url
  • Create a Docker Compose with ell services
    • Postgres
    • Mqtt
    • API server
    • Studio server

I should probably mention I do not have mastery in Python. Overall things seem to work but more testing would be good.

Breaking changes

  • _store in the configurator was replaced with ell client. set_store should be replaced with ell.init and the ell sqllite client or just the storage dir as an argument. We can preserve the previous behavior, just didn’t think it was worth it for where we’re at. This may affect examples so I’ll look at those

IMG_3809

@alex-dixon alex-dixon marked this pull request as ready for review August 12, 2024 00:40
def __init__(self, **kwargs):
super().__init__(**kwargs)

def model_post_init(self, __context):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should let us do Config() instead of Config.create() and have the same functionality

# This intends to honor the default we had set in the CLI
storage_dir = os.getcwd()
# todo. better default?
self.storage_dir = os.getcwd()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m ending up with SQLite dbs in various places. Maybe we standardize on ell home as a default?

@alex-dixon alex-dixon marked this pull request as draft August 12, 2024 14:17
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know if you want to restore this. Broadcast was replicated in the new “pub sub” interfaces. connection moved to the ws handler.

src/ell/studio/pubsub.py Outdated Show resolved Hide resolved
src/ell/studio/pubsub.py Outdated Show resolved Hide resolved
@alex-dixon
Copy link
Contributor Author

There’s an error in the api implementation where a parent invocation isn’t in the db at the time a child is being written. Investigating.

@alex-dixon
Copy link
Contributor Author

I can't repro this in tests. Can't repro it with sqlite either. So I think it's something wrong with the rest api implementation, differences between sqlite and postgres, or a race condition that the api/postgres impl encounters but sqlite doesn't.

2024-08-23 07:28:24 [2024-08-23 14:28:24] INFO     ell.api.config: Resolved config: {
2024-08-23 07:28:24   "storage_dir": null,
2024-08-23 07:28:24   "pg_connection_string": "postgresql://ell_user:ell_password@postgres:5432/ell_db",
2024-08-23 07:28:24   "mqtt_connection_string": "mqtt://mqtt:1883",
2024-08-23 07:28:24   "log_level": 20
2024-08-23 07:28:24 }
2024-08-23 07:28:24 INFO:     Started server process [1]
2024-08-23 07:28:24 INFO:     Waiting for application startup.
2024-08-23 07:28:24 [2024-08-23 14:28:24] INFO     ell.api.server: Starting lifespan
2024-08-23 07:28:24 [2024-08-23 14:28:24] INFO     ell.api.server: Connecting to MQTT broker at mqtt:1883
2024-08-23 07:28:24 [2024-08-23 14:28:24] INFO     ell.api.server: Connected to MQTT
2024-08-23 07:28:24 INFO:     Application startup complete.
2024-08-23 07:28:24 INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
2024-08-23 07:32:35 ERROR:    Exception in ASGI application
2024-08-23 07:32:35 Traceback (most recent call last):
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
2024-08-23 07:32:35     self.dialect.do_execute(
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 924, in do_execute
2024-08-23 07:32:35     cursor.execute(statement, parameters)
2024-08-23 07:32:35 psycopg2.errors.ForeignKeyViolation: insert or update on table "invocationtrace" violates foreign key constraint "invocationtrace_invocation_consumer_id_fkey"
2024-08-23 07:32:35 DETAIL:  Key (invocation_consumer_id)=(invocation-c5764323a75a09e3688586a73ec82482) is not present in table "invocation".
2024-08-23 07:32:35 
2024-08-23 07:32:35 
2024-08-23 07:32:35 The above exception was the direct cause of the following exception:
2024-08-23 07:32:35 
2024-08-23 07:32:35 Traceback (most recent call last):
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 399, in run_asgi
2024-08-23 07:32:35     result = await app(  # type: ignore[func-returns-value]
2024-08-23 07:32:35              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 70, in __call__
2024-08-23 07:32:35     return await self.app(scope, receive, send)
2024-08-23 07:32:35            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/fastapi/applications.py", line 1054, in __call__
2024-08-23 07:32:35     await super().__call__(scope, receive, send)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/applications.py", line 123, in __call__
2024-08-23 07:32:35     await self.middleware_stack(scope, receive, send)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 186, in __call__
2024-08-23 07:32:35     raise exc
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 164, in __call__
2024-08-23 07:32:35     await self.app(scope, receive, _send)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
2024-08-23 07:32:35     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
2024-08-23 07:32:35     raise exc
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
2024-08-23 07:32:35     await app(scope, receive, sender)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 756, in __call__
2024-08-23 07:32:35     await self.middleware_stack(scope, receive, send)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 776, in app
2024-08-23 07:32:35     await route.handle(scope, receive, send)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 297, in handle
2024-08-23 07:32:35     await self.app(scope, receive, send)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 77, in app
2024-08-23 07:32:35     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
2024-08-23 07:32:35     raise exc
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
2024-08-23 07:32:35     await app(scope, receive, sender)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 72, in app
2024-08-23 07:32:35     response = await func(request)
2024-08-23 07:32:35                ^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py", line 278, in app
2024-08-23 07:32:35     raw_response = await run_endpoint_function(
2024-08-23 07:32:35                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py", line 191, in run_endpoint_function
2024-08-23 07:32:35     return await dependant.call(**values)
2024-08-23 07:32:35            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/app/ell/api/server.py", line 137, in write_invocation
2024-08-23 07:32:35     _invo = serializer.write_invocation(
2024-08-23 07:32:35             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/app/ell/stores/sql.py", line 82, in write_invocation
2024-08-23 07:32:35     session.commit()
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2017, in commit
2024-08-23 07:32:35     trans.commit(_to_root=True)
2024-08-23 07:32:35   File "<string>", line 2, in commit
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
2024-08-23 07:32:35     ret_value = fn(self, *arg, **kw)
2024-08-23 07:32:35                 ^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1302, in commit
2024-08-23 07:32:35     self._prepare_impl()
2024-08-23 07:32:35   File "<string>", line 2, in _prepare_impl
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
2024-08-23 07:32:35     ret_value = fn(self, *arg, **kw)
2024-08-23 07:32:35                 ^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1277, in _prepare_impl
2024-08-23 07:32:35     self.session.flush()
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 4341, in flush
2024-08-23 07:32:35     self._flush(objects)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 4476, in _flush
2024-08-23 07:32:35     with util.safe_reraise():
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
2024-08-23 07:32:35     raise exc_value.with_traceback(exc_tb)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 4437, in _flush
2024-08-23 07:32:35     flush_context.execute()
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 466, in execute
2024-08-23 07:32:35     rec.execute(self)
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 642, in execute
2024-08-23 07:32:35     util.preloaded.orm_persistence.save_obj(
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 93, in save_obj
2024-08-23 07:32:35     _emit_insert_statements(
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1048, in _emit_insert_statements
2024-08-23 07:32:35     result = connection.execute(
2024-08-23 07:32:35              ^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
2024-08-23 07:32:35     return meth(
2024-08-23 07:32:35            ^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
2024-08-23 07:32:35     return connection._execute_clauseelement(
2024-08-23 07:32:35            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
2024-08-23 07:32:35     ret = self._execute_context(
2024-08-23 07:32:35           ^^^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
2024-08-23 07:32:35     return self._exec_single_context(
2024-08-23 07:32:35            ^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
2024-08-23 07:32:35     self._handle_dbapi_exception(
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2353, in _handle_dbapi_exception
2024-08-23 07:32:35     raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
2024-08-23 07:32:35     self.dialect.do_execute(
2024-08-23 07:32:35   File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 924, in do_execute
2024-08-23 07:32:35     cursor.execute(statement, parameters)
2024-08-23 07:32:35 sqlalchemy.exc.IntegrityError: (psycopg2.errors.ForeignKeyViolation) insert or update on table "invocationtrace" violates foreign key constraint "invocationtrace_invocation_consumer_id_fkey"
2024-08-23 07:32:35 DETAIL:  Key (invocation_consumer_id)=(invocation-c5764323a75a09e3688586a73ec82482) is not present in table "invocation".
2024-08-23 07:32:35 
2024-08-23 07:32:35 [SQL: INSERT INTO invocationtrace (invocation_consumer_id, invocation_consuming_id) VALUES (%(invocation_consumer_id)s, %(invocation_consuming_id)s)]
2024-08-23 07:32:35 [parameters: {'invocation_consumer_id': 'invocation-c5764323a75a09e3688586a73ec82482', 'invocation_consuming_id': 'invocation-49b8cbf0ccff9e68af6d9e0d85adac2b'}]
2024-08-23 07:32:35 (Background on this error at: https://sqlalche.me/e/20/gkpj)

@alex-dixon
Copy link
Contributor Author

Adding session.commit() after the invocation and before the traces appears to have fixed this.

@alex-dixon alex-dixon marked this pull request as ready for review August 23, 2024 16:14
@alex-dixon alex-dixon changed the title add pubsub add api, production setup Aug 23, 2024
@MadcowD
Copy link
Owner

MadcowD commented Aug 30, 2024

My big question here is can we have su[prot for both this server client version and one version where we actually still ahve sqlite?

@alex-dixon
Copy link
Contributor Author

@MadcowD Yes :) I took care to preserve that. it should work just as it did before. the distributed implementation is completely optional but works the same

@alex-dixon alex-dixon changed the title add api, production setup add api, distributed setup Sep 7, 2024
@alex-dixon alex-dixon mentioned this pull request Sep 18, 2024
23 tasks
@MadcowD
Copy link
Owner

MadcowD commented Sep 19, 2024

we're gonna merge i swear to god

@alex-dixon
Copy link
Contributor Author

From yesterday’s discussion:

  • Let’s put the docker-compose file somewhere else like in studio
  • How much does this pr add to our “docs budget”?
  • Fewer click throughs to see what code does. Ell’s codebase should not feel labyrinthian. Fewer files is often better
  • Why does everyone else have separate async and sync implementations and shouldn’t we do the same? What’s the overhead for starting our own aio event loop in a sync context?
  • We want to avoid bloat of dependencies like mqtt in our code. So we need a way to allow users to bring some of these features themselves if they wish (without putting every last one in the codebase)

@MadcowD
Copy link
Owner

MadcowD commented Sep 25, 2024

OK, so what we should do here is ref factor studio to be a bit more self-contained in the package having a separate L_studio package and self containing all of the docker file related stuff in one directory so as to not confuse developers. That would make this a bit more clean

@MadcowD
Copy link
Owner

MadcowD commented Sep 25, 2024

Can try this weekend

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants