Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the Trino client spooling protocol #496

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,30 @@ conn = connect(
)
```

## Spooled protocol

The client spooling protocol requires [a Trino server with spooling protocol support](https://trino.io/docs/current/client/client-protocol.html#spooling-protocol).

Enable the spooling protocol by specifying a supported encoding in the `encoding` parameter:

```python
from trino.dbapi import connect

conn = connect(
encoding="json+zstd"
)
```

or a list of supported encodings:

```python
from trino.dbapi import connect

conn = connect(
encoding=["json+zstd", "json"]
)
```

## Transactions

The client runs by default in *autocommit* mode. To enable transactions, set
Expand Down
1 change: 1 addition & 0 deletions etc/catalog/jmx.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=jmx
1 change: 1 addition & 0 deletions etc/catalog/memory.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=memory
1 change: 1 addition & 0 deletions etc/catalog/tpcds.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=tpcds
2 changes: 2 additions & 0 deletions etc/catalog/tpch.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
connector.name=tpch
tpch.splits-per-node=4
11 changes: 11 additions & 0 deletions etc/config-pre-466.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
node.id=coordinator
node.environment=test

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=1GB
discovery.uri=http://localhost:8080

# Disable http request log
http-server.log.enabled=false
17 changes: 17 additions & 0 deletions etc/config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
node.id=coordinator
node.environment=test

coordinator=true
experimental.concurrent-startup=true
mdesmet marked this conversation as resolved.
Show resolved Hide resolved
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=1GB
discovery.uri=http://localhost:8080

# spooling protocol settings
protocol.spooling.enabled=true
protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0=
protocol.spooling.retrieval-mode=coordinator_proxy

# Disable http request log
http-server.log.enabled=false
16 changes: 16 additions & 0 deletions etc/jvm-pre-466.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-server
-Xmx2G
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=150M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache.
-Djdk.nio.maxCachedBufferSize=0
# Allow loading dynamic agent used by JOL
-XX:+EnableDynamicAgentLoading
-XX:+UnlockDiagnosticVMOptions
17 changes: 17 additions & 0 deletions etc/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-server
-Xmx2G
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=150M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache.
-Djdk.nio.maxCachedBufferSize=0
# Allow loading dynamic agent used by JOL
-XX:+EnableDynamicAgentLoading
-XX:+UnlockDiagnosticVMOptions
--enable-native-access=ALL-UNNAMED
8 changes: 8 additions & 0 deletions etc/spooling-manager.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
spooling-manager.name=filesystem
fs.s3.enabled=true
fs.location=s3://spooling/
s3.endpoint=http://localstack:4566/
s3.region=us-east-1
s3.aws-access-key=test
s3.aws-secret-key=test
s3.path-style-access=true
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
"pre-commit",
"black",
"isort",
"keyring"
"keyring",
"testcontainers",
"boto3"
]

setup(
Expand Down Expand Up @@ -81,11 +83,13 @@
],
python_requires=">=3.9",
install_requires=[
"lz4",
"python-dateutil",
"pytz",
# requests CVE https://github.com/advisories/GHSA-j8r2-6x86-q33q
"requests>=2.31.0",
"tzlocal",
"zstandard",
],
extras_require={
"all": all_require,
Expand Down
127 changes: 127 additions & 0 deletions tests/development_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import os
import time
from contextlib import contextmanager
from pathlib import Path

from testcontainers.core.container import DockerContainer
from testcontainers.core.network import Network
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.localstack import LocalStackContainer

from trino.constants import DEFAULT_PORT

MINIO_ROOT_USER = "minio-access-key"
MINIO_ROOT_PASSWORD = "minio-secret-key"

TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest"
TRINO_HOST = "localhost"


mdesmet marked this conversation as resolved.
Show resolved Hide resolved
def create_bucket(s3_client):
bucket_name = "spooling"
wendigo marked this conversation as resolved.
Show resolved Hide resolved
try:
print("Checking for bucket existence...")
response = s3_client.list_buckets()
buckets = [bucket["Name"] for bucket in response["Buckets"]]
if bucket_name in buckets:
print("Bucket exists!")
return
except s3_client.exceptions.ClientError as e:
if not e.response['Error']['Code'] == '404':
print("An error occurred:", e)
return

try:
print("Creating bucket...")
s3_client.create_bucket(
Bucket=bucket_name,
)
print("Bucket created!")
except s3_client.exceptions.ClientError as e:
print("An error occurred:", e)


@contextmanager
def start_development_server(port=None, trino_version=TRINO_VERSION):
network = None
localstack = None
trino = None

try:
network = Network().create()
supports_spooling_protocol = TRINO_VERSION == "latest" or int(TRINO_VERSION) >= 466
if supports_spooling_protocol:
localstack = LocalStackContainer(image="localstack/localstack:latest", region_name="us-east-1") \
.with_name("localstack") \
.with_network(network) \
.with_bind_ports(4566, 4566) \
.with_bind_ports(4571, 4571) \
.with_env("SERVICES", "s3")

# Start the container
print("Starting LocalStack container...")
localstack.start()

# Wait for logs indicating MinIO has started
wait_for_logs(localstack, "Ready.", timeout=30)

# create spooling bucket
create_bucket(localstack.get_client("s3"))

trino = DockerContainer(f"trinodb/trino:{trino_version}") \
.with_name("trino") \
.with_network(network) \
.with_env("TRINO_CONFIG_DIR", "/etc/trino") \
.with_bind_ports(DEFAULT_PORT, port)

root = Path(__file__).parent.parent

trino = trino \
.with_volume_mapping(str(root / "etc/catalog"), "/etc/trino/catalog")

# Enable spooling config
if supports_spooling_protocol:
trino \
.with_volume_mapping(
str(root / "etc/spooling-manager.properties"),
"/etc/trino/spooling-manager.properties", "rw") \
.with_volume_mapping(str(root / "etc/jvm.config"), "/etc/trino/jvm.config") \
.with_volume_mapping(str(root / "etc/config.properties"), "/etc/trino/config.properties")
else:
trino \
.with_volume_mapping(str(root / "etc/jvm-pre-466.config"), "/etc/trino/jvm.config") \
.with_volume_mapping(str(root / "etc/config-pre-466.properties"), "/etc/trino/config.properties")

print("Starting Trino container...")
trino.start()

# Wait for logs indicating the service has started
wait_for_logs(trino, "SERVER STARTED", timeout=60)

# Otherwise some tests fail with No nodes available
time.sleep(2)

yield localstack, trino, network
finally:
# Stop containers when exiting the context
if trino:
print("Stopping Trino container...")
trino.stop()
if localstack:
print("Stopping LocalStack container...")
localstack.stop()
if network:
network.remove()


def main():
"""Run Trino setup independently from pytest."""
with start_development_server(port=DEFAULT_PORT):
print(f"Trino started at {TRINO_HOST}:{DEFAULT_PORT}")

# Keep the process running so that the containers stay up
input("Press Enter to stop containers...")


if __name__ == "__main__":
main()
Loading
Loading