From d0378d22ab8c59ef79f001e033cfe938c8f3a348 Mon Sep 17 00:00:00 2001 From: Ben Cassell <98852248+benc-db@users.noreply.github.com> Date: Tue, 15 Oct 2024 14:47:01 -0700 Subject: [PATCH] Implement microbatch incremental strategy (#825) --- CHANGELOG.md | 1 + dbt/adapters/databricks/__version__.py | 2 +- dbt/adapters/databricks/impl.py | 21 ++++++++++++------- .../incremental/strategies.sql | 15 +++++++++++++ .../materializations/incremental/validate.sql | 4 ++-- .../functional/adapter/microbatch/fixtures.py | 16 ++++++++++++++ .../adapter/microbatch/test_microbatch.py | 16 ++++++++++++++ 7 files changed, 65 insertions(+), 10 deletions(-) create mode 100644 tests/functional/adapter/microbatch/fixtures.py create mode 100644 tests/functional/adapter/microbatch/test_microbatch.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4de6ba8e3..8238716f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - Add `include_full_name_in_path` config boolean for external locations. This writes tables to {location_root}/{catalog}/{schema}/{table} ([823](https://github.com/databricks/dbt-databricks/pull/823)) - Add a new `workflow_job` submission method for python, which creates a long-lived Databricks Workflow instead of a one-time run (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762)) - Allow for additional options to be passed to the Databricks Job API when using other python submission methods. For example, enable email_notifications (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762)) +- Support microbatch incremental strategy using replace_where ([825](https://github.com/databricks/dbt-databricks/pull/825)) ### Under the Hood diff --git a/dbt/adapters/databricks/__version__.py b/dbt/adapters/databricks/__version__.py index 192c2fde1..ddfbfc125 100644 --- a/dbt/adapters/databricks/__version__.py +++ b/dbt/adapters/databricks/__version__.py @@ -1 +1 @@ -version: str = "1.8.7" +version: str = "1.9.0b1" diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index c569f164e..b275aa4fc 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -86,6 +86,7 @@ from dbt_common.utils import executor from dbt_common.utils.dict import AttrDict from dbt_common.exceptions import DbtConfigError +from dbt_common.exceptions import DbtInternalError from dbt_common.contracts.config.base import BaseConfig if TYPE_CHECKING: @@ -650,7 +651,7 @@ def run_sql_for_tests( conn.transaction_open = False def valid_incremental_strategies(self) -> List[str]: - return ["append", "merge", "insert_overwrite", "replace_where"] + return ["append", "merge", "insert_overwrite", "replace_where", "microbatch"] @property def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]: @@ -699,12 +700,18 @@ def get_persist_doc_columns( # an error when we tried to alter the table. for column in existing_columns: name = column.column - if ( - name in columns - and "description" in columns[name] - and columns[name]["description"] != (column.comment or "") - ): - return_columns[name] = columns[name] + if name in columns: + config_column = columns[name] + if isinstance(config_column, dict): + comment = columns[name].get("description", "") + elif hasattr(config_column, "description"): + comment = config_column.description + else: + raise DbtInternalError( + f"Column {name} in model config is not a dictionary or ColumnInfo object." + ) + if comment != (column.comment or ""): + return_columns[name] = columns[name] return return_columns diff --git a/dbt/include/databricks/macros/materializations/incremental/strategies.sql b/dbt/include/databricks/macros/materializations/incremental/strategies.sql index 9a3fae21d..1a03ee9fd 100644 --- a/dbt/include/databricks/macros/materializations/incremental/strategies.sql +++ b/dbt/include/databricks/macros/materializations/incremental/strategies.sql @@ -170,3 +170,18 @@ select {{source_cols_csv}} from {{ source_relation }} {%- endfor %}) {%- endif -%} {% endmacro %} + +{% macro databricks__get_incremental_microbatch_sql(arg_dict) %} + {%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} + {%- set event_time = model.config.event_time -%} + {%- set start_time = config.get("__dbt_internal_microbatch_event_time_start") -%} + {%- set end_time = config.get("__dbt_internal_microbatch_event_time_end") -%} + {%- if start_time -%} + {%- do incremental_predicates.append("cast(" ~ event_time ~ " as TIMESTAMP) >= '" ~ start_time ~ "'") -%} + {%- endif -%} + {%- if end_time -%} + {%- do incremental_predicates.append("cast(" ~ event_time ~ " as TIMESTAMP) < '" ~ end_time ~ "'") -%} + {%- endif -%} + {%- do arg_dict.update({'incremental_predicates': incremental_predicates}) -%} + {{ return(get_replace_where_sql(arg_dict)) }} +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/materializations/incremental/validate.sql b/dbt/include/databricks/macros/materializations/incremental/validate.sql index 6b18e1938..7b5c5bd7e 100644 --- a/dbt/include/databricks/macros/materializations/incremental/validate.sql +++ b/dbt/include/databricks/macros/materializations/incremental/validate.sql @@ -35,13 +35,13 @@ Use the 'merge' or 'replace_where' strategy instead {%- endset %} - {% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where'] %} + {% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where', 'microbatch'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %} {% do exceptions.raise_compiler_error(invalid_delta_only_msg) %} {% endif %} - {% if raw_strategy == 'replace_where' and file_format not in ['delta'] %} + {% if raw_strategy in ('replace_where', 'microbatch') and file_format not in ['delta'] %} {% do exceptions.raise_compiler_error(invalid_delta_only_msg) %} {% endif %} {% endif %} diff --git a/tests/functional/adapter/microbatch/fixtures.py b/tests/functional/adapter/microbatch/fixtures.py new file mode 100644 index 000000000..9a6dc900d --- /dev/null +++ b/tests/functional/adapter/microbatch/fixtures.py @@ -0,0 +1,16 @@ +schema = """version: 2 +models: + - name: input_model + + - name: microbatch_model + config: + persist_docs: + relation: True + columns: True + description: This is a microbatch model + columns: + - name: id + description: "Id of the model" + - name: event_time + description: "Timestamp of the event" +""" diff --git a/tests/functional/adapter/microbatch/test_microbatch.py b/tests/functional/adapter/microbatch/test_microbatch.py new file mode 100644 index 000000000..4bf66a227 --- /dev/null +++ b/tests/functional/adapter/microbatch/test_microbatch.py @@ -0,0 +1,16 @@ +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) +import pytest + +from tests.functional.adapter.microbatch import fixtures + + +class TestDatabricksMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def models(self, microbatch_model_sql, input_model_sql): + return { + "schema.yml": fixtures.schema, + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + }