diff --git a/.gitignore b/.gitignore index 110fabc..ba12498 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ dist _version.py .idea/ venv/ +.venv/ tmp/ .vscode/ build/ diff --git a/.pylintrc b/.pylintrc index 0f486f9..739330d 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,4 +1,5 @@ [MASTER] -disable=fixme,logging-fstring-interpolation,too-many-positional-arguments +disable=fixme,logging-fstring-interpolation [DESIGN] max-args=10 +max-attributes=7 diff --git a/dune_client/api/base.py b/dune_client/api/base.py index 79abddf..cdc4aa2 100644 --- a/dune_client/api/base.py +++ b/dune_client/api/base.py @@ -9,7 +9,7 @@ import logging.config import os from json import JSONDecodeError -from typing import Any, Dict, List, Optional, Union, IO +from typing import Any, Dict, Optional, IO from requests import Response, Session from requests.adapters import HTTPAdapter, Retry @@ -30,7 +30,7 @@ class BaseDuneClient: and provides some convenient functions to use in other clients """ - def __init__( # pylint: disable=too-many-arguments + def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments self, api_key: str, base_url: str = "https://api.dune.com", @@ -83,51 +83,6 @@ def default_headers(self) -> Dict[str, str]: "User-Agent": f"dune-client/{client_version} (https://pypi.org/project/dune-client/)", } - ############ - # Utilities: - ############ - - def _build_parameters( - self, - params: Optional[Dict[str, Union[str, int]]] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, - limit: Optional[int] = None, - offset: Optional[int] = None, - allow_partial_results: str = "true", - ) -> Dict[str, Union[str, int]]: - """ - Utility function that builds a dictionary of parameters to be used - when retrieving advanced results (filters, pagination, sorting, etc.). - This is shared between the sync and async client. - """ - # Ensure we don't specify parameters that are incompatible: - assert ( - # We are not sampling - sample_count is None - # We are sampling and don't use filters or pagination - or (limit is None and offset is None and filters is None) - ), "sampling cannot be combined with filters or pagination" - - params = params or {} - params["allow_partial_results"] = allow_partial_results - if columns is not None and len(columns) > 0: - params["columns"] = ",".join(columns) - if sample_count is not None: - params["sample_count"] = sample_count - if filters is not None: - params["filters"] = filters - if sort_by is not None and len(sort_by) > 0: - params["sort_by"] = ",".join(sort_by) - if limit is not None: - params["limit"] = limit - if offset is not None: - params["offset"] = offset - - return params - class BaseRouter(BaseDuneClient): """Extending the Base Client with elementary api routing""" diff --git a/dune_client/api/custom.py b/dune_client/api/custom.py index 3809036..b99add2 100644 --- a/dune_client/api/custom.py +++ b/dune_client/api/custom.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from typing import List, Optional +from typing import List, NamedTuple, Optional from dune_client.api.base import BaseRouter from dune_client.models import ( @@ -13,6 +13,19 @@ ) +class CustomAPIParams(NamedTuple): + """ + Params for Custom Endpoint API Function + """ + + limit: Optional[int] = None + offset: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + + # pylint: disable=duplicate-code class CustomEndpointAPI(BaseRouter): """ @@ -25,12 +38,7 @@ def get_custom_endpoint_result( self, handle: str, endpoint: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[CustomAPIParams] = None, ) -> ResultsResponse: """ Custom endpoints allow you to fetch and filter data from any @@ -48,17 +56,12 @@ def get_custom_endpoint_result( filters (str, optional): The filters to apply. sort_by (List[str], optional): The columns to sort by. """ - params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, - ) + if params is None: + params = CustomAPIParams() + response_json = self._get( route=f"/endpoints/{handle}/{endpoint}/results", - params=params, + params=params._asdict(), ) try: return ResultsResponse.from_dict(response_json) diff --git a/dune_client/api/execution.py b/dune_client/api/execution.py index e15e6bd..825d091 100644 --- a/dune_client/api/execution.py +++ b/dune_client/api/execution.py @@ -7,7 +7,7 @@ """ from io import BytesIO -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, NamedTuple, Optional from deprecated import deprecated @@ -27,6 +27,21 @@ from dune_client.query import QueryBase +class GetExecutionResultsParams(NamedTuple): + """ + Parameters for get execution result functions + """ + + limit: Optional[int] = None + columns: Optional[List[str]] = None + batch_size: Optional[int] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + offset: Optional[int] = None + allow_partial_results: str = "true" + + class ExecutionAPI(BaseRouter): """ Query execution and result fetching functions. @@ -75,38 +90,19 @@ def get_execution_status(self, job_id: str) -> ExecutionStatusResponse: def get_execution_results( self, job_id: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, - allow_partial_results: str = "true", + params: Optional[GetExecutionResultsParams] = None, ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" - params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, - allow_partial_results=allow_partial_results, - ) + + if params is None: + params = GetExecutionResultsParams() route = f"/execution/{job_id}/results" url = self._route_url(route) - return self._get_execution_results_by_url(url=url, params=params) + return self._get_execution_results_by_url(url=url, params=params._asdict()) def get_execution_results_csv( - self, - job_id: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - filters: Optional[str] = None, - sample_count: Optional[int] = None, - sort_by: Optional[List[str]] = None, + self, job_id: str, params: Optional[GetExecutionResultsParams] = None ) -> ExecutionResultCSV: """ GET results in CSV format from Dune API for `job_id` (aka `execution_id`) @@ -115,18 +111,13 @@ def get_execution_results_csv( use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ - params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, - ) + + if params is None: + params = GetExecutionResultsParams() route = f"/execution/{job_id}/results/csv" url = self._route_url(route) - return self._get_execution_results_csv_by_url(url=url, params=params) + return self._get_execution_results_csv_by_url(url=url, params=params._asdict()) def _get_execution_results_by_url( self, url: str, params: Optional[Dict[str, Any]] = None diff --git a/dune_client/api/extensions.py b/dune_client/api/extensions.py index 44fda20..e1c7107 100644 --- a/dune_client/api/extensions.py +++ b/dune_client/api/extensions.py @@ -8,7 +8,7 @@ import time from io import BytesIO -from typing import Any, List, Optional, Union +from typing import Any, List, NamedTuple, Optional, Union from deprecated import deprecated @@ -17,7 +17,7 @@ DUNE_CSV_NEXT_OFFSET_HEADER, MAX_NUM_ROWS_PER_BATCH, ) -from dune_client.api.execution import ExecutionAPI +from dune_client.api.execution import ExecutionAPI, GetExecutionResultsParams from dune_client.api.query import QueryAPI from dune_client.api.table import TableAPI from dune_client.api.custom import CustomEndpointAPI @@ -29,6 +29,7 @@ ExecutionResultCSV, ) from dune_client.query import QueryBase, parse_query_object_or_id + from dune_client.types import QueryParameter from dune_client.util import age_in_hours @@ -38,6 +39,34 @@ POLL_FREQUENCY_SECONDS = 1 +class RunQueryParams(NamedTuple): + "Params for run query function" + performance: Optional[str] = None + batch_size: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + + +class GetLatestResultParams(NamedTuple): + "Params for get latest functions" + batch_size: Optional[int] = None + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + + +class RunSQLParams(NamedTuple): + "Params for Run SQL function" + query_params: Optional[list[QueryParameter]] = None + is_private: bool = True + archive_after: bool = True + performance: Optional[str] = None + ping_frequency: int = POLL_FREQUENCY_SECONDS + + class ExtendedAPI(ExecutionAPI, QueryAPI, TableAPI, CustomEndpointAPI): """ Provides higher level helper methods for faster @@ -48,13 +77,8 @@ def run_query( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, allow_partial_results: str = "true", + params: Optional[RunQueryParams] = None, ) -> ResultsResponse: """ Executes a Dune `query`, waits until execution completes, @@ -62,29 +86,35 @@ def run_query( Sleeps `ping_frequency` seconds between each status request. """ # Ensure we don't specify parameters that are incompatible: + if params is None: + params = RunQueryParams() + assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if sample_count is not None: + if params.sample_count is not None: limit = None else: - limit = batch_size or MAX_NUM_ROWS_PER_BATCH + limit = params.batch_size or MAX_NUM_ROWS_PER_BATCH # pylint: disable=duplicate-code - job_id = self._refresh(query, ping_frequency, performance) + job_id = self._refresh(query, ping_frequency, params.performance) + params = GetExecutionResultsParams( + limit=limit, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + allow_partial_results=allow_partial_results, + ) return self._fetch_entire_result( self.get_execution_results( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - allow_partial_results=allow_partial_results, + params=params, ), ) @@ -92,12 +122,7 @@ def run_query_csv( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[RunQueryParams] = None, ) -> ExecutionResultCSV: """ Executes a Dune query, waits till execution completes, @@ -105,28 +130,33 @@ def run_query_csv( (use it load the data directly in pandas.from_csv() or similar frameworks) """ # Ensure we don't specify parameters that are incompatible: + if params is None: + params = RunQueryParams() assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if sample_count is not None: + if params.sample_count is not None: limit = None else: - limit = batch_size or MAX_NUM_ROWS_PER_BATCH + limit = params.batch_size or MAX_NUM_ROWS_PER_BATCH # pylint: disable=duplicate-code - job_id = self._refresh(query, ping_frequency, performance) + job_id = self._refresh(query, ping_frequency, params.performance) + params = GetExecutionResultsParams( + limit=limit, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + ) return self._fetch_entire_result_csv( self.get_execution_results_csv( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, + params=params, ), ) @@ -134,12 +164,7 @@ def run_query_dataframe( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[RunQueryParams] = None, ) -> Any: """ Execute a Dune Query, waits till execution completes, @@ -147,21 +172,27 @@ def run_query_dataframe( This is a convenience method that uses run_query_csv() + pandas.read_csv() underneath """ + if params is None: + params = RunQueryParams() + try: import pandas # pylint: disable=import-outside-toplevel except ImportError as exc: raise ImportError( "dependency failure, pandas is required but missing" ) from exc + params = RunQueryParams( + performance=params.performance, + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + ) data = self.run_query_csv( query, ping_frequency, - performance, - batch_size=batch_size, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, + params=params, ).data return pandas.read_csv(data) @@ -169,11 +200,7 @@ def get_latest_result( self, query: Union[QueryBase, str, int], max_age_hours: int = THREE_MONTHS_IN_HOURS, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[GetLatestResultParams] = None, ) -> ResultsResponse: """ GET the latest results for a query_id without re-executing the query @@ -184,26 +211,30 @@ def get_latest_result( https://docs.dune.com/api-reference/executions/endpoint/get-query-result """ # Ensure we don't specify parameters that are incompatible: + if params is None: + params = GetLatestResultParams() + + batch_size = params.batch_size assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - params, query_id = parse_query_object_or_id(query) + get_params, query_id = parse_query_object_or_id(query) # Only fetch 1 row to get metadata first to determine if the result is fresh enough - if params is None: - params = {} - params["limit"] = 1 + if get_params is None: + get_params = {} + get_params["limit"] = 1 response_json = self._get( route=f"/query/{query_id}/results", - params=params, + params=get_params, ) try: - if sample_count is None and batch_size is None: + if params.sample_count is None and batch_size is None: batch_size = MAX_NUM_ROWS_PER_BATCH metadata = ResultsResponse.from_dict(response_json) last_run = metadata.times.execution_ended_at @@ -213,25 +244,33 @@ def get_latest_result( logging.info( f"results (from {last_run}) older than {max_age_hours} hours, re-running query" ) - results = self.run_query( - query if isinstance(query, QueryBase) else QueryBase(query_id), - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, + params = RunQueryParams( batch_size=batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + ) + results = self.run_query( + query=( + query if isinstance(query, QueryBase) else QueryBase(query_id) + ), + params=params, ) else: # The results are fresh enough, retrieve the entire result # pylint: disable=duplicate-code + params = GetExecutionResultsParams( + batch_size=batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + ) results = self._fetch_entire_result( self.get_execution_results( metadata.execution_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=batch_size, + params=params, ), ) return results @@ -241,11 +280,7 @@ def get_latest_result( def get_latest_result_dataframe( self, query: Union[QueryBase, str, int], - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[GetLatestResultParams] = None, ) -> Any: """ GET the latest results for a query_id without re-executing the query @@ -254,56 +289,55 @@ def get_latest_result_dataframe( This is a convenience method that uses get_latest_result() + pandas.read_csv() underneath """ + if params is None: + params = GetLatestResultParams() try: import pandas # pylint: disable=import-outside-toplevel except ImportError as exc: raise ImportError( "dependency failure, pandas is required but missing" ) from exc - + params = GetLatestResultParams( + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + ) results = self.download_csv( query, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - batch_size=batch_size, + params=params, ) return pandas.read_csv(results.data) def download_csv( self, query: Union[QueryBase, str, int], - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[GetLatestResultParams] = None, ) -> ExecutionResultCSV: """ Almost like an alias for `get_latest_result` but for the csv endpoint. https://docs.dune.com/api-reference/executions/endpoint/get-query-result-csv """ # Ensure we don't specify parameters that are incompatible: + if params is None: + params = GetLatestResultParams() + assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - params, query_id = parse_query_object_or_id(query) + get_params, query_id = parse_query_object_or_id(query) - params = self._build_parameters( - params=params, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=batch_size, - ) - if sample_count is None and batch_size is None: - params["limit"] = MAX_NUM_ROWS_PER_BATCH + params = params._asdict() + + params["params"] = get_params + + if params.sample_count is None and params.batch_size is None: + get_params["limit"] = MAX_NUM_ROWS_PER_BATCH response = self._get( route=f"/query/{query_id}/results/csv", params=params, raw=True @@ -327,12 +361,8 @@ def download_csv( def run_sql( self, query_sql: str, - params: Optional[list[QueryParameter]] = None, - is_private: bool = True, - archive_after: bool = True, - performance: Optional[str] = None, - ping_frequency: int = POLL_FREQUENCY_SECONDS, name: str = "API Query", + params: Optional[RunSQLParams] = None, ) -> ResultsResponse: """ Allows user to provide execute raw_sql via the CRUD interface @@ -340,13 +370,21 @@ def run_sql( - Query is by default made private and archived after execution. Requires Plus subscription! """ - query = self.create_query(name, query_sql, params, is_private) + if params is None: + params = RunSQLParams() + + query = self.create_query( + name, query_sql, params.query_params, params.is_private + ) + run_query_params = RunQueryParams(performance=params.performance) try: results = self.run_query( - query=query.base, performance=performance, ping_frequency=ping_frequency + query=query.base, + ping_frequency=params.ping_frequency, + params=run_query_params, ) finally: - if archive_after: + if params.archive_after: self.archive_query(query.base.query_id) return results @@ -365,7 +403,12 @@ def refresh( fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ - return self.run_query(query, ping_frequency, performance) + params = RunQueryParams(performance=performance) + return self.run_query( + query=query, + ping_frequency=ping_frequency, + params=params, + ) @deprecated(version="1.2.1", reason="Please use run_query_csv") def refresh_csv( @@ -379,7 +422,8 @@ def refresh_csv( fetches and the results in CSV format (use it load the data directly in pandas.from_csv() or similar frameworks) """ - return self.run_query_csv(query, ping_frequency, performance) + params = RunQueryParams(performance=performance) + return self.run_query_csv(query, ping_frequency, params=params) @deprecated(version="1.2.1", reason="Please use run_query_dataframe") def refresh_into_dataframe( @@ -394,7 +438,8 @@ def refresh_into_dataframe( This is a convenience method that uses refresh_csv underneath """ - return self.run_query_dataframe(query, ping_frequency, performance) + params = RunQueryParams(performance=performance) + return self.run_query_dataframe(query, ping_frequency, params=params) ################# # Private Methods diff --git a/dune_client/api/query.py b/dune_client/api/query.py index 56ddcc1..8737b7d 100644 --- a/dune_client/api/query.py +++ b/dune_client/api/query.py @@ -6,7 +6,7 @@ """ from __future__ import annotations -from typing import Optional, Any +from typing import NamedTuple, Optional, Any from dune_client.api.base import BaseRouter from dune_client.models import DuneError @@ -14,6 +14,15 @@ from dune_client.types import QueryParameter +class UpdateQueryParams(NamedTuple): + "Params for Update Query function" + name: Optional[str] = None + query_sql: Optional[str] = None + query_params: Optional[list[QueryParameter]] = None + description: Optional[str] = None + tags: Optional[list[str]] = None + + class QueryAPI(BaseRouter): """ Implementation of Query API (aka CRUD) Operations - Plus subscription only @@ -54,14 +63,10 @@ def get_query(self, query_id: int) -> DuneQuery: response_json = self._get(route=f"/query/{query_id}") return DuneQuery.from_dict(response_json) - def update_query( # pylint: disable=too-many-arguments + def update_query( self, query_id: int, - name: Optional[str] = None, - query_sql: Optional[str] = None, - params: Optional[list[QueryParameter]] = None, - description: Optional[str] = None, - tags: Optional[list[str]] = None, + params: Optional[UpdateQueryParams] = None, ) -> int: """ Updates Dune Query by ID @@ -72,17 +77,20 @@ def update_query( # pylint: disable=too-many-arguments If the tags or parameters are provided as an empty array, they will be deleted from the query. """ + if params is None: + params = UpdateQueryParams() + parameters: dict[str, Any] = {} - if name is not None: - parameters["name"] = name - if description is not None: - parameters["description"] = description - if tags is not None: - parameters["tags"] = tags - if query_sql is not None: - parameters["query_sql"] = query_sql - if params is not None: - parameters["parameters"] = [p.to_dict() for p in params] + if params.name is not None: + parameters["name"] = params.name + if params.description is not None: + parameters["description"] = params.description + if params.tags is not None: + parameters["tags"] = params.tags + if params.query_sql is not None: + parameters["query_sql"] = params.query_sql + if params.query_params is not None: + parameters["parameters"] = [p.to_dict() for p in params.query_params] if not bool(parameters): # Nothing to change no need to make reqeust diff --git a/dune_client/api/table.py b/dune_client/api/table.py index 239d0fc..f5c66f9 100644 --- a/dune_client/api/table.py +++ b/dune_client/api/table.py @@ -52,7 +52,7 @@ def upload_csv( except KeyError as err: raise DuneError(response_json, "UploadCsvResponse", err) from err - def create_table( + def create_table( # pylint: disable=too-many-instance-attributes, too-many-positional-arguments self, namespace: str, table_name: str, diff --git a/dune_client/client_async.py b/dune_client/client_async.py index 39e24fa..e1ccb40 100644 --- a/dune_client/client_async.py +++ b/dune_client/client_async.py @@ -9,7 +9,7 @@ import asyncio import ssl from io import BytesIO -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, NamedTuple, Optional, Union import certifi from aiohttp import ( @@ -40,6 +40,45 @@ from dune_client.query import QueryBase, parse_query_object_or_id +class GetResultParams(NamedTuple): + """ + Parameters for get reult functions + """ + + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + batch_size: Optional[int] = None + + +class RefreshParams(NamedTuple): + """ + Parameters for refresh functions + """ + + performance: Optional[str] = None + batch_size: Optional[int] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + columns: Optional[List[str]] = None + + +class ResultPageParams(NamedTuple): + """ + Parameters for result page functions + """ + + columns: Optional[List[str]] = None + sample_count: Optional[int] = None + filters: Optional[str] = None + sort_by: Optional[List[str]] = None + limit: Optional[int] = None + offset: Optional[int] = None + batch_size: Optional[int] = None + + class RetryableError(Exception): """ Internal exception used to signal that the request should be retried @@ -232,30 +271,34 @@ async def get_status(self, job_id: str) -> ExecutionStatusResponse: async def get_result( self, job_id: str, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[GetResultParams] = None, ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" + if params is None: + params = GetResultParams() + + batch_size = params.batch_size + assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if sample_count is None and batch_size is None: + if params.sample_count is None and batch_size is None: batch_size = MAX_NUM_ROWS_PER_BATCH + result_page_params = ResultPageParams( + batch_size=batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + ) results = await self._get_result_page( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=batch_size, + params=result_page_params, ) while results.next_uri is not None: batch = await self._get_result_by_url(results.next_uri) @@ -266,11 +309,7 @@ async def get_result( async def get_result_csv( self, job_id: str, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[GetResultParams] = None, ) -> ExecutionResultCSV: """ GET results in CSV format from Dune API for `job_id` (aka `execution_id`) @@ -279,23 +318,31 @@ async def get_result_csv( use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ + if params is None: + params = GetResultParams() + + batch_size = params.batch_size + assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" - if sample_count is None and batch_size is None: + if params.sample_count is None and batch_size is None: batch_size = MAX_NUM_ROWS_PER_BATCH + params = ResultPageParams( + batch_size=batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + ) results = await self._get_result_csv_page( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=batch_size, + params=params, ) while results.next_uri is not None: batch = await self._get_result_csv_by_url(results.next_uri) @@ -356,82 +403,80 @@ async def cancel_execution(self, job_id: str) -> bool: async def refresh( self, query: QueryBase, + params: Optional[RefreshParams] = None, ping_frequency: int = 5, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, ) -> ResultsResponse: """ Executes a Dune `query`, waits until execution completes, fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ + + if params is None: + params = RefreshParams() + assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" job_id = await self._refresh( - query, ping_frequency=ping_frequency, performance=performance + query, ping_frequency=ping_frequency, performance=params.performance + ) + params = GetResultParams( + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) return await self.get_result( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - batch_size=batch_size, + params=params, ) async def refresh_csv( self, query: QueryBase, + params: Optional[RefreshParams] = None, ping_frequency: int = 5, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, ) -> ExecutionResultCSV: """ Executes a Dune query, waits till execution completes, fetches and the results in CSV format (use it load the data directly in pandas.from_csv() or similar frameworks) """ + if params is None: + params = RefreshParams() + assert ( # We are not sampling - sample_count is None + params.sample_count is None # We are sampling and don't use filters or pagination - or (batch_size is None and filters is None) + or (params.batch_size is None and params.filters is None) ), "sampling cannot be combined with filters or pagination" job_id = await self._refresh( - query, ping_frequency=ping_frequency, performance=performance + query, ping_frequency=ping_frequency, performance=params.performance + ) + get_result_params = GetResultParams( + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, ) return await self.get_result_csv( job_id, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - batch_size=batch_size, + params=get_result_params, ) async def refresh_into_dataframe( self, query: QueryBase, - performance: Optional[str] = None, - batch_size: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[RefreshParams] = None, ) -> Any: """ Execute a Dune Query, waits till execution completes, @@ -439,20 +484,26 @@ async def refresh_into_dataframe( This is a convenience method that uses refresh_csv underneath """ + if params is None: + params = RefreshParams() + try: import pandas # pylint: disable=import-outside-toplevel except ImportError as exc: raise ImportError( "dependency failure, pandas is required but missing" ) from exc + params = RefreshParams( + performance=params.performance, + batch_size=params.batch_size, + columns=params.columns, + sample_count=params.sample_count, + filters=params.filters, + sort_by=params.sort_by, + ) results = await self.refresh_csv( - query, - performance=performance, - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - batch_size=batch_size, + query=query, + params=params, ) return pandas.read_csv(results.data) @@ -463,27 +514,24 @@ async def refresh_into_dataframe( async def _get_result_page( self, job_id: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + params: Optional[ResultPageParams] = None, ) -> ResultsResponse: """GET a page of results from Dune API for `job_id` (aka `execution_id`)""" + if params is None: + params = ResultPageParams() + + limit = params.limit + offset = params.offset - if sample_count is None and limit is None and offset is None: + if params.sample_count is None and limit is None and offset is None: limit = MAX_NUM_ROWS_PER_BATCH offset = 0 - params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, - ) + params = params._asdict() + + params["limit"] = limit + params["offset"] = offset + response_json = await self._get( route=f"/execution/{job_id}/results", params=params, @@ -510,31 +558,25 @@ async def _get_result_by_url( raise DuneError(response_json, "ResultsResponse", err) from err async def _get_result_csv_page( - self, - job_id: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - columns: Optional[List[str]] = None, - sample_count: Optional[int] = None, - filters: Optional[str] = None, - sort_by: Optional[List[str]] = None, + self, job_id: str, params: Optional[ResultPageParams] = None ) -> ExecutionResultCSV: """ GET a page of results in CSV format from Dune API for `job_id` (aka `execution_id`) """ + if params is None: + params = ResultPageParams() - if sample_count is None and limit is None and offset is None: + limit = params.limit + offset = params.offset + + if params.sample_count is None and limit is None and offset is None: limit = MAX_NUM_ROWS_PER_BATCH offset = 0 - params = self._build_parameters( - columns=columns, - sample_count=sample_count, - filters=filters, - sort_by=sort_by, - limit=limit, - offset=offset, - ) + params = params._asdict() + + params["limit"] = limit + params["offset"] = offset route = f"/execution/{job_id}/results/csv" response = await self._get(route=route, params=params, raw=True) diff --git a/dune_client/models.py b/dune_client/models.py index b975edb..044be72 100644 --- a/dune_client/models.py +++ b/dune_client/models.py @@ -297,7 +297,7 @@ def __add__(self, other: ExecutionResult) -> ExecutionResult: @dataclass -class ResultsResponse: +class ResultsResponse: # pylint: disable=too-many-instance-attributes """ Representation of Response from Dune's [Get] Query Results endpoint """ diff --git a/tests/e2e/test_async_client.py b/tests/e2e/test_async_client.py index 1d9b97f..c7833dc 100644 --- a/tests/e2e/test_async_client.py +++ b/tests/e2e/test_async_client.py @@ -5,7 +5,7 @@ import dotenv import pandas -from dune_client.client_async import AsyncDuneClient +from dune_client.client_async import AsyncDuneClient, RefreshParams from dune_client.query import QueryBase @@ -42,7 +42,10 @@ async def test_refresh_with_pagination(self): # Arrange async with AsyncDuneClient(self.valid_api_key) as cl: # Act - results = (await cl.refresh(self.multi_rows_query, batch_size=1)).get_rows() + params = RefreshParams(batch_size=1) + results = ( + await cl.refresh(self.multi_rows_query, params=params) + ).get_rows() # Assert self.assertEqual( @@ -60,8 +63,9 @@ async def test_refresh_with_filters(self): # Arrange async with AsyncDuneClient(self.valid_api_key) as cl: # Act + params = RefreshParams(filters="number < 3") results = ( - await cl.refresh(self.multi_rows_query, filters="number < 3") + await cl.refresh(self.multi_rows_query, params=params) ).get_rows() # Assert @@ -77,7 +81,8 @@ async def test_refresh_csv_with_pagination(self): # Arrange async with AsyncDuneClient(self.valid_api_key) as cl: # Act - result_csv = await cl.refresh_csv(self.multi_rows_query, batch_size=1) + params = RefreshParams(batch_size=1) + result_csv = await cl.refresh_csv(self.multi_rows_query, params=params) # Assert self.assertEqual( @@ -95,9 +100,8 @@ async def test_refresh_csv_with_filters(self): # Arrange async with AsyncDuneClient(self.valid_api_key) as cl: # Act - result_csv = await cl.refresh_csv( - self.multi_rows_query, filters="number < 3" - ) + params = RefreshParams(filters="number < 3") + result_csv = await cl.refresh_csv(self.multi_rows_query, params=params) # Assert self.assertEqual( @@ -111,7 +115,8 @@ async def test_refresh_csv_with_filters(self): @unittest.skip("Large performance tier doesn't currently work.") async def test_refresh_context_manager_performance_large(self): async with AsyncDuneClient(self.valid_api_key) as cl: - results = (await cl.refresh(self.query, performance="large")).get_rows() + params = RefreshParams(performance="large") + results = (await cl.refresh(self.query, params=params)).get_rows() self.assertGreater(len(results), 0) async def test_get_latest_result_with_query_object(self): diff --git a/tests/e2e/test_client.py b/tests/e2e/test_client.py index 799c0c1..99d2342 100644 --- a/tests/e2e/test_client.py +++ b/tests/e2e/test_client.py @@ -6,6 +6,7 @@ import dotenv import pandas +from dune_client.api.extensions import GetLatestResultParams, RunQueryParams from dune_client.models import ( ExecutionState, ExecutionResponse, @@ -78,7 +79,8 @@ def test_run_query_paginated(self): dune = DuneClient(self.valid_api_key) # Act - results = dune.run_query(self.multi_rows_query, batch_size=1).get_rows() + params = RunQueryParams(batch_size=1) + results = dune.run_query(self.multi_rows_query, params=params).get_rows() # Assert self.assertEqual( @@ -97,7 +99,8 @@ def test_run_query_with_filters(self): dune = DuneClient(self.valid_api_key) # Act - results = dune.run_query(self.multi_rows_query, filters="number < 3").get_rows() + params = RunQueryParams(filters="number < 3") + results = dune.run_query(self.multi_rows_query, params=params).get_rows() # Assert self.assertEqual( @@ -110,7 +113,8 @@ def test_run_query_with_filters(self): def test_run_query_performance_large(self): dune = DuneClient(self.valid_api_key) - results = dune.run_query(self.query, performance="large").get_rows() + params = RunQueryParams(performance="large") + results = dune.run_query(self.query, params=params).get_rows() self.assertGreater(len(results), 0) def test_run_query_dataframe(self): @@ -328,7 +332,8 @@ def test_download_csv_with_pagination(self): client.run_query(self.multi_rows_query) # Act - result_csv = client.download_csv(self.multi_rows_query.query_id, batch_size=1) + params = GetLatestResultParams(batch_size=1) + result_csv = client.download_csv(self.multi_rows_query.query_id, params=params) # Assert self.assertEqual( @@ -348,9 +353,10 @@ def test_download_csv_with_filters(self): client.run_query(self.multi_rows_query) # Act + params = GetLatestResultParams(filters="number < 3") result_csv = client.download_csv( self.multi_rows_query.query_id, - filters="number < 3", + params=params, ) # Assert