Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #36 from azhard/bigquery-support
Browse files Browse the repository at this point in the history
Added BigQuery support
  • Loading branch information
jtcohen6 authored Oct 14, 2020
2 parents ea94198 + ba48647 commit 8b283c1
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 23 deletions.
24 changes: 24 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ jobs:
steps:
- checkout

- run:
run: setup_creds
command: |
echo $BIGQUERY_SERVICE_ACCOUNT_JSON > ${HOME}/bigquery-service-key.json
- restore_cache:
key: deps1-{{ .Branch }}

Expand Down Expand Up @@ -78,6 +83,25 @@ jobs:
dbt --warn-error run --target snowflake --full-refresh
dbt --warn-error run --target snowflake
- run:
name: "Run Tests - BigQuery"
environment:
BIGQUERY_SERVICE_KEY_PATH: "/home/circleci/bigquery-service-key.json"

command: |
. venv/bin/activate
echo `pwd`
cd integration_tests
dbt --warn-error deps --target bigquery
dbt --warn-error run-operation drop_audit_schema --target bigquery
dbt --warn-error run --target bigquery --full-refresh
dbt --warn-error run --target bigquery
dbt --warn-error run-operation drop_audit_schema --target bigquery
dbt --warn-error run-operation create_legacy_audit_table --target bigquery
dbt --warn-error run --target bigquery --full-refresh
dbt --warn-error run --target bigquery
- save_cache:
key: deps1-{{ .Branch }}
paths:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
target/
dbt_modules/
logs/

# pycharm
.idea/
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
> this package, especially as a post-hook. Please consider if this package is
> appropriate for your use case before using it.
Requires dbt >= 0.17.0
Requires dbt >= 0.18.0

This package provides out-of-the-box functionality to log events for all dbt
invocations, including run start, run end, model start, and model end. It
Expand Down Expand Up @@ -49,9 +49,11 @@ For example to always log into a specific schema, say `analytics_meta`, regardle

### Adapter support

This package is currently compatible with dbt's Snowflake, Redshift, and
This package is currently compatible with dbt's BigQuery<sup>1</sup>, Snowflake, Redshift, and
Postgres integrations.

<sup>1</sup> BigQuery support may only work when 1 thread is set in your `profiles.yml` file. Anything larger may result in "quota exceeded" errors.

### Migration guide

#### v0.1.17 -> v0.2.0
Expand Down
8 changes: 5 additions & 3 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ name: 'logging'
version: '0.3.0'
config-version: 2

require-dbt-version: [">=0.17.0"]
require-dbt-version: ">=0.18.0"

source-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
data-paths: ["data"]
macro-paths: ["macros"]

require-dbt-version: ">=0.16.0"

target-path: "target"
clean-targets:
- "target"
Expand All @@ -31,3 +29,7 @@ on-run-end:
models:
logging:
+schema: meta
bigquery:
+enabled: '{{ target.type == "bigquery" | as_bool }}'
default:
+enabled: '{{ target.type != "bigquery" | as_bool }}'
14 changes: 13 additions & 1 deletion integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,19 @@ test-snowflake:
dbt run --target snowflake --full-refresh
dbt run --target snowflake

test-all: test-postgres test-redshift test-snowflake

test-bigquery:
dbt run-operation drop_audit_schema --target bigquery
dbt run --target bigquery --full-refresh
dbt run --target bigquery


dbt run-operation drop_audit_schema --target bigquery
dbt run-operation create_legacy_audit_table --target bigquery
dbt run --target bigquery --full-refresh
dbt run --target bigquery

test-all: test-postgres test-redshift test-snowflake test-bigquery
echo "Completed successfully"

test-cloud: test-redshift test-snowflake
Expand Down
8 changes: 8 additions & 0 deletions integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ integration_tests:
warehouse: "{{ env_var('SNOWFLAKE_TEST_WAREHOUSE') }}"
schema: event_logging_integration_tests_snowflake
threads: 1

bigquery:
type: bigquery
method: service-account
keyfile: "{{ env_var('BIGQUERY_SERVICE_KEY_PATH') }}"
project: "{{ env_var('BIGQUERY_TEST_DATABASE') }}"
schema: event_logging_integration_tests_bigquery
threads: 1
12 changes: 6 additions & 6 deletions integration_tests/macros/create_old_audit_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

create table if not exists {{ logging.get_audit_relation() }}
(
event_name varchar(512),
event_timestamp {{dbt_utils.type_timestamp()}},
event_schema varchar(512),
event_model varchar(512),
invocation_id varchar(512)
event_name {{ dbt_utils.type_string() }},
event_timestamp {{ dbt_utils.type_timestamp() }},
event_schema {{ dbt_utils.type_string() }},
event_model {{ dbt_utils.type_string() }},
invocation_id {{ dbt_utils.type_string() }}
)
{{ dbt_utils.log_info("Created legacy audit table") }}
{% do dbt_utils.log_info("Created legacy audit table") %}
{% endmacro %}
4 changes: 3 additions & 1 deletion integration_tests/macros/drop_audit_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
{% if adapter.check_schema_exists(target.database, audit_schema) %}
{% set audit_schema_relation = api.Relation.create(database=target.database, schema=audit_schema).without_identifier() %}
{% do drop_schema(audit_schema_relation) %}
{% do run_query("commit;") %}
{% if adapter.type() != 'bigquery' %}
{% do run_query("commit;") %}
{% endif %}
{{ dbt_utils.log_info("Audit schema dropped")}}

{% else %}
Expand Down
42 changes: 33 additions & 9 deletions macros/audit.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@

{% endmacro %}

{% macro log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) %}

{% macro log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) -%}

{{ return(adapter.dispatch('log_audit_event', packages=['logging'])(event_name, schema, relation, user, target_name, is_full_refresh)) }}

{% endmacro %}

{% macro default__log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) %}

insert into {{ logging.get_audit_relation() }} (
event_name,
Expand Down Expand Up @@ -55,21 +62,31 @@


{% macro create_audit_schema() %}
create schema if not exists {{ logging.get_audit_schema() }}
{% do create_schema(api.Relation.create(
database=target.database,
schema=logging.get_audit_schema())
) %}
{% endmacro %}


{% macro create_audit_log_table() -%}

{{ return(adapter.dispatch('create_audit_log_table', packages=['logging'])()) }}

{% endmacro %}


{% macro default__create_audit_log_table() -%}

{% set required_columns = [
["event_name", "varchar(512)"],
["event_name", dbt_utils.type_string()],
["event_timestamp", dbt_utils.type_timestamp()],
["event_schema", "varchar(512)"],
["event_model", "varchar(512)"],
["event_user", "varchar(512)"],
["event_target", "varchar(512)"],
["event_schema", dbt_utils.type_string()],
["event_model", dbt_utils.type_string()],
["event_user", dbt_utils.type_string()],
["event_target", dbt_utils.type_string()],
["event_is_full_refresh", "boolean"],
["invocation_id", "varchar(512)"],
["invocation_id", dbt_utils.type_string()],
] -%}

{% set audit_table = logging.get_audit_relation() -%}
Expand Down Expand Up @@ -125,7 +142,7 @@


{% macro log_model_start_event() %}
{{logging.log_audit_event(
{{ logging.log_audit_event(
'model deployment started', schema=this.schema, relation=this.name, user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH
) }}
{% endmacro %}
Expand All @@ -136,3 +153,10 @@
'model deployment completed', schema=this.schema, relation=this.name, user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH
) }}
{% endmacro %}


{% macro log_custom_event(event_name) %}
{{ logging.log_audit_event(
event_name, schema=this.schema, relation=this.name, user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH
) }}
{% endmacro %}
73 changes: 73 additions & 0 deletions macros/bigquery.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{% macro bigquery__log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) %}

insert into {{ logging.get_audit_relation() }} (
event_name,
event_timestamp,
event_schema,
event_model,
event_target,
event_is_full_refresh,
invocation_id
)

values (
'{{ event_name }}',
{{ dbt_utils.current_timestamp_in_utc() }},
{% if schema != None %}'{{ schema }}'{% else %}null{% endif %},
{% if relation != None %}'{{ relation }}'{% else %}null{% endif %},
{% if target_name != None %}'{{ target_name }}'{% else %}null{% endif %},
{% if is_full_refresh %}TRUE{% else %}FALSE{% endif %},
'{{ invocation_id }}'
);

{% endmacro %}


{% macro bigquery__create_audit_log_table() -%}

{% set required_columns = [
["event_name", dbt_utils.type_string()],
["event_timestamp", dbt_utils.type_timestamp()],
["event_schema", dbt_utils.type_string()],
["event_model", dbt_utils.type_string()],
["event_target", dbt_utils.type_string()],
["event_is_full_refresh", "BOOLEAN"],
["invocation_id", dbt_utils.type_string()],
] -%}

{% set audit_table = logging.get_audit_relation() -%}

{% set audit_table_exists = adapter.get_relation(audit_table.database, audit_table.schema, audit_table.name) -%}


{% if audit_table_exists -%}

{%- set columns_to_create = [] -%}

{# map to lower to cater for snowflake returning column names as upper case #}
{%- set existing_columns = adapter.get_columns_in_relation(audit_table)|map(attribute='column')|map('lower')|list -%}

{%- for required_column in required_columns -%}
{%- if required_column[0] not in existing_columns -%}
{%- do columns_to_create.append(required_column) -%}

{%- endif -%}
{%- endfor -%}


{%- for column in columns_to_create -%}
alter table {{ audit_table }}
add column {{ column[0] }} {{ column[1] }}
default null;
{% endfor -%}

{%- else -%}
create table if not exists {{ audit_table }}
(
{% for column in required_columns %}
{{ column[0] }} {{ column[1] }}{% if not loop.last %},{% endif %}
{% endfor %}
)
{%- endif -%}

{%- endmacro %}
33 changes: 33 additions & 0 deletions models/bigquery/stg_dbt_deployments.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
with events as (

select * from {{ref('stg_dbt_audit_log')}}

),

aggregated as (

select

invocation_id,
event_target as target,
event_is_full_refresh as is_full_refresh,

min(case
when event_name = 'run started' then event_timestamp
end) as deployment_started_at,

min(case
when event_name = 'run completed' then event_timestamp
end) as deployment_completed_at,

count(distinct case
when event_name like '%model%' then event_model
end) as models_deployed

from events

{{ dbt_utils.group_by(n=3) }}

)

select * from aggregated
38 changes: 38 additions & 0 deletions models/bigquery/stg_dbt_model_deployments.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
with events as (

select * from {{ ref('stg_dbt_audit_log') }}

),

aggregated as (

select

{{ dbt_utils.surrogate_key([
'event_model',
'invocation_id'
]) }} as model_deployment_id,

invocation_id,
event_model as model,
event_schema as schema,
event_target as target,
event_is_full_refresh as is_full_refresh,

min(case
when event_name = 'model deployment started' then event_timestamp
end) as deployment_started_at,

min(case
when event_name = 'model deployment completed' then event_timestamp
end) as deployment_completed_at

from events

where event_name like '%model%'

{{ dbt_utils.group_by(n=6) }}

)

select * from aggregated
File renamed without changes.
2 changes: 1 addition & 1 deletion packages.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
packages:
- package: fishtown-analytics/dbt_utils
version: [">=0.4.0", "<0.7.0"]
version: [">=0.6.0", "<0.7.0"]

0 comments on commit 8b283c1

Please sign in to comment.