-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqlalchemy_example.py
174 lines (147 loc) · 4.95 KB
/
sqlalchemy_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
databricks-sql-connector includes a SQLAlchemy 2.0 dialect compatible with Databricks SQL. To install
its dependencies you can run `pip install databricks-sql-connector[sqlalchemy]`.
The expected connection string format which you can pass to create_engine() is:
databricks://token:dapi***@***.cloud.databricks.com?http_path=/sql/***&catalog=**&schema=**
Our dialect implements the majority of SQLAlchemy 2.0's API. Because of the extent of SQLAlchemy's
capabilities it isn't feasible to provide examples of every usage in a single script, so we only
provide a basic one here. Learn more about usage in README.sqlalchemy.md in this repo.
"""
# fmt: off
import os
from datetime import date, datetime, time, timedelta, timezone
from decimal import Decimal
from uuid import UUID
# By convention, backend-specific SQLA types are defined in uppercase
# This dialect exposes Databricks SQL's TIMESTAMP and TINYINT types
# as these are not covered by the generic, camelcase types shown below
from databricks.sqlalchemy import TIMESTAMP, TINYINT
# Beside the CamelCase types shown below, line comments reflect
# the underlying Databricks SQL / Delta table type
from sqlalchemy import (
BigInteger, # BIGINT
Boolean, # BOOLEAN
Column,
Date, # DATE
DateTime, # TIMESTAMP_NTZ
Integer, # INTEGER
Numeric, # DECIMAL
String, # STRING
Time, # STRING
Uuid, # STRING
create_engine,
select,
)
from sqlalchemy.orm import DeclarativeBase, Session
host = os.getenv("DATABRICKS_SERVER_HOSTNAME")
http_path = os.getenv("DATABRICKS_HTTP_PATH")
access_token = os.getenv("DATABRICKS_TOKEN")
catalog = os.getenv("DATABRICKS_CATALOG")
schema = os.getenv("DATABRICKS_SCHEMA")
# Extra arguments are passed untouched to databricks-sql-connector
# See src/databricks/sql/thrift_backend.py for complete list
extra_connect_args = {
"_tls_verify_hostname": True,
"_user_agent_entry": "PySQL Example Script",
}
engine = create_engine(
f"databricks://token:{access_token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}",
connect_args=extra_connect_args, echo=True,
)
class Base(DeclarativeBase):
pass
# This object gives a usage example for each supported type
# for more details on these, see README.sqlalchemy.md
class SampleObject(Base):
__tablename__ = "pysql_sqlalchemy_example_table"
bigint_col = Column(BigInteger, primary_key=True)
string_col = Column(String)
tinyint_col = Column(TINYINT)
int_col = Column(Integer)
numeric_col = Column(Numeric(10, 2))
boolean_col = Column(Boolean)
date_col = Column(Date)
datetime_col = Column(TIMESTAMP)
datetime_col_ntz = Column(DateTime)
time_col = Column(Time)
uuid_col = Column(Uuid)
# This generates a CREATE TABLE statement against the catalog and schema
# specified in the connection string
Base.metadata.create_all(engine)
# Output SQL is:
# CREATE TABLE pysql_sqlalchemy_example_table (
# bigint_col BIGINT NOT NULL,
# string_col STRING,
# tinyint_col SMALLINT,
# int_col INT,
# numeric_col DECIMAL(10, 2),
# boolean_col BOOLEAN,
# date_col DATE,
# datetime_col TIMESTAMP,
# datetime_col_ntz TIMESTAMP_NTZ,
# time_col STRING,
# uuid_col STRING,
# PRIMARY KEY (bigint_col)
# ) USING DELTA
# The code that follows will INSERT a record using SQLAlchemy ORM containing these values
# and then SELECT it back out. The output is compared to the input to demonstrate that
# all type information is preserved.
sample_object = {
"bigint_col": 1234567890123456789,
"string_col": "foo",
"tinyint_col": -100,
"int_col": 5280,
"numeric_col": Decimal("525600.01"),
"boolean_col": True,
"date_col": date(2020, 12, 25),
"datetime_col": datetime(
1991, 8, 3, 21, 30, 5, tzinfo=timezone(timedelta(hours=-8))
),
"datetime_col_ntz": datetime(1990, 12, 4, 6, 33, 41),
"time_col": time(23, 59, 59),
"uuid_col": UUID(int=255),
}
sa_obj = SampleObject(**sample_object)
session = Session(engine)
session.add(sa_obj)
session.commit()
# Output SQL is:
# INSERT INTO
# pysql_sqlalchemy_example_table (
# bigint_col,
# string_col,
# tinyint_col,
# int_col,
# numeric_col,
# boolean_col,
# date_col,
# datetime_col,
# datetime_col_ntz,
# time_col,
# uuid_col
# )
# VALUES
# (
# :bigint_col,
# :string_col,
# :tinyint_col,
# :int_col,
# :numeric_col,
# :boolean_col,
# :date_col,
# :datetime_col,
# :datetime_col_ntz,
# :time_col,
# :uuid_col
# )
# Here we build a SELECT query using ORM
stmt = select(SampleObject).where(SampleObject.int_col == 5280)
# Then fetch one result with session.scalar()
result = session.scalar(stmt)
# Finally, we read out the input data and compare it to the output
compare = {key: getattr(result, key) for key in sample_object.keys()}
assert compare == sample_object
# Then we drop the demonstration table
Base.metadata.drop_all(engine)
# Output SQL is:
# DROP TABLE pysql_sqlalchemy_example_table