The Databricks dialect for SQLAlchemy serves as bridge between SQLAlchemy and the Databricks SQL Python driver. A working example demonstrating usage can be found in sqlalchemy_example.py
.
To install the dialect and its dependencies:
pip install databricks-sqlalchemy
If you also plan to use alembic
you can alternatively run:
pip install alembic
Every SQLAlchemy application that connects to a database needs to use an Engine, which you can create by passing a connection string to create_engine
. The connection string must include these components:
- Host
- HTTP Path for a compute resource
- API access token
- Initial catalog for the connection
- Initial schema for the connection
Note: Our dialect is built and tested on workspaces with Unity Catalog enabled. Support for the hive_metastore
catalog is untested.
For example:
import os
from sqlalchemy import create_engine
host = os.getenv("DATABRICKS_SERVER_HOSTNAME")
http_path = os.getenv("DATABRICKS_HTTP_PATH")
access_token = os.getenv("DATABRICKS_TOKEN")
catalog = os.getenv("DATABRICKS_CATALOG")
schema = os.getenv("DATABRICKS_SCHEMA")
engine = create_engine(
f"databricks://token:{access_token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}"
)
The SQLAlchemy type hierarchy contains backend-agnostic type implementations (represented in CamelCase) and backend-specific types (represented in UPPERCASE). The majority of SQLAlchemy's CamelCase types are supported. This means that a SQLAlchemy application using these types should "just work" with Databricks.
SQLAlchemy Type | Databricks SQL Type |
---|---|
BigInteger |
BIGINT |
LargeBinary |
(not supported) |
Boolean |
BOOLEAN |
Date |
DATE |
DateTime |
TIMESTAMP_NTZ |
Double |
DOUBLE |
Enum |
(not supported) |
Float |
FLOAT |
Integer |
INT |
Numeric |
DECIMAL |
PickleType |
(not supported) |
SmallInteger |
SMALLINT |
String |
STRING |
Text |
STRING |
Time |
STRING |
Unicode |
STRING |
UnicodeText |
STRING |
Uuid |
STRING |
In addition, the dialect exposes three UPPERCASE SQLAlchemy types which are specific to Databricks:
Databricks Runtime doesn't currently support binding of binary values in SQL queries, which is a pre-requisite for this functionality in SQLAlchemy.
Support for CHECK
constraints is not implemented in this dialect. Support is planned for a future release.
SQLAlchemy's Enum()
type depends on CHECK
constraints and is therefore not yet supported.
Databricks Runtime provides two datetime-like types: TIMESTAMP
which is always timezone-aware and TIMESTAMP_NTZ
which is timezone agnostic. Both types can be imported from databricks.sqlalchemy
and used in your models.
The SQLAlchemy documentation indicates that DateTime()
is not timezone-aware by default. So our dialect maps this type to TIMESTAMP_NTZ()
. In practice, you should never need to use TIMESTAMP_NTZ()
directly. Just use DateTime()
.
If you need your field to be timezone-aware, you can import TIMESTAMP()
and use it instead.
Note that SQLAlchemy documentation suggests that you can declare a DateTime()
with timezone=True
on supported backends. However, if you do this with the Databricks dialect, the timezone
argument will be ignored.
from sqlalchemy import DateTime
from databricks.sqlalchemy import TIMESTAMP
class SomeModel(Base):
some_date_without_timezone = DateTime()
some_date_with_timezone = TIMESTAMP()
Databricks Runtime doesn't support length limitations for STRING
fields. Therefore String()
or String(1)
or String(255)
will all produce identical DDL. Since Text()
, Unicode()
, UnicodeText()
all use the same underlying type in Databricks SQL, they will generate equivalent DDL.
Databricks Runtime doesn't have a native time-like data type. To implement this type in SQLAlchemy, our dialect stores SQLAlchemy Time()
values in a STRING
field. Unlike DateTime
above, this type can optionally support timezone awareness (since the dialect is in complete control of the strings that we write to the Delta table).
from sqlalchemy import Time
class SomeModel(Base):
time_tz = Time(timezone=True)
time_ntz = Time()
Identity and generated value support is currently limited in this dialect.
When defining models, SQLAlchemy types can accept an autoincrement
argument. In our dialect, this argument is currently ignored. To create an auto-incrementing field in your model you can pass in an explicit Identity()
instead.
Furthermore, in Databricks Runtime, only BIGINT
fields can be configured to auto-increment. So in SQLAlchemy, you must use the BigInteger()
type.
from sqlalchemy import Identity, String
class SomeModel(Base):
id = BigInteger(Identity())
value = String()
When calling Base.metadata.create_all()
, the executed DDL will include GENERATED ALWAYS AS IDENTITY
for the id
column. This is useful when using SQLAlchemy to generate tables. However, as of this writing, Identity()
constructs are not captured when SQLAlchemy reflects a table's metadata (support for this is planned).
databricks-sql-connector
supports two approaches to parameterizing SQL queries: native and inline. Our SQLAlchemy 2.0 dialect always uses the native approach and is therefore limited to DBR 14.2 and above. If you are writing parameterized queries to be executed by SQLAlchemy, you must use the "named" paramstyle (:param
). Read more about parameterization in docs/parameters.md
.
Use pandas.DataFrame.to_sql
and pandas.read_sql
to write and read from Databricks SQL. These methods both accept a SQLAlchemy connection to interact with Databricks.
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine("databricks://token:dapi***@***.cloud.databricks.com?http_path=***&catalog=main&schema=test")
with engine.connect() as conn:
# This will read the contents of `main.test.some_table`
df = pd.read_sql("some_table", conn)
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine("databricks://token:dapi***@***.cloud.databricks.com?http_path=***&catalog=main&schema=test")
squares = [(i, i * i) for i in range(100)]
df = pd.DataFrame(data=squares,columns=['x','x_squared'])
with engine.connect() as conn:
# This will write the contents of `df` to `main.test.squares`
df.to_sql('squares',conn)
Unity Catalog workspaces in Databricks support PRIMARY KEY and FOREIGN KEY constraints. Note that Databricks Runtime does not enforce the integrity of FOREIGN KEY constraints. You can establish a primary key by setting primary_key=True
when defining a column.
When building ForeignKey
or ForeignKeyConstraint
objects, you must specify a name
for the constraint.
If your model definition requires a self-referential FOREIGN KEY constraint, you must include use_alter=True
when defining the relationship.
from sqlalchemy import Table, Column, ForeignKey, BigInteger, String
users = Table(
"users",
metadata_obj,
Column("id", BigInteger, primary_key=True),
Column("name", String(), nullable=False),
Column("email", String()),
Column("manager_id", ForeignKey("users.id", name="fk_users_manager_id_x_users_id", use_alter=True))
)