From 6945ac632c9def9ae0e48b81d4e1444fa0c5f3f4 Mon Sep 17 00:00:00 2001 From: Bruno Antonellini Date: Tue, 22 Oct 2024 14:30:40 -0300 Subject: [PATCH 1/6] DCV-2973 changes to Datacoves secrets handling --- dbt_coves/config/config.py | 12 ++++----- dbt_coves/tasks/generate/airflow_dags.py | 21 +++++++++------ dbt_coves/utils/flags.py | 20 +++++++------- dbt_coves/utils/secrets.py | 33 +++++++++++++++++++----- 4 files changed, 56 insertions(+), 30 deletions(-) diff --git a/dbt_coves/config/config.py b/dbt_coves/config/config.py index 68effc5d..c5bd1f3f 100644 --- a/dbt_coves/config/config.py +++ b/dbt_coves/config/config.py @@ -66,7 +66,7 @@ class GenerateAirflowDagsModel(BaseModel): secrets_manager: Optional[str] = "" secrets_url: Optional[str] = "" secrets_token: Optional[str] = "" - secrets_project: Optional[str] = "" + secrets_environment: Optional[str] = "" secrets_tags: Optional[str] = "" secrets_key: Optional[str] = "" @@ -100,7 +100,7 @@ class LoadAirbyteModel(BaseModel): secrets_manager: Optional[str] = "" secrets_url: Optional[str] = "" secrets_token: Optional[str] = "" - secrets_project: Optional[str] = "" + secrets_environment: Optional[str] = "" secrets_tags: Optional[List[str]] = [] secrets_key: Optional[str] = "" @@ -114,7 +114,7 @@ class LoadFivetranModel(BaseModel): secrets_manager: Optional[str] = "" secrets_url: Optional[str] = "" secrets_token: Optional[str] = "" - secrets_project: Optional[str] = "" + secrets_environment: Optional[str] = "" secrets_tags: Optional[List[str]] = [] secrets_key: Optional[str] = "" @@ -224,7 +224,7 @@ class DbtCovesConfig: "generate.airflow_dags.secrets_manager", "generate.airflow_dags.secrets_url", "generate.airflow_dags.secrets_token", - "generate.airflow_dags.secrets_project", + "generate.airflow_dags.secrets_environment", "generate.airflow_dags.secrets_tags", "generate.airflow_dags.secrets_key", "extract.airbyte.path", @@ -237,7 +237,7 @@ class DbtCovesConfig: "load.airbyte.secrets_manager", "load.airbyte.secrets_url", "load.airbyte.secrets_token", - "load.airbyte.secrets_project", + "load.airbyte.secrets_environment", "load.airbyte.secrets_tags", "load.airbyte.secrets_key", "setup.no_prompt", @@ -259,7 +259,7 @@ class DbtCovesConfig: "load.fivetran.secrets_manager", "load.fivetran.secrets_url", "load.fivetran.secrets_token", - "load.fivetran.secrets_project", + "load.fivetran.secrets_environment", "load.fivetran.secrets_tags", "load.fivetran.secrets_key", "data_sync.redshift.tables", diff --git a/dbt_coves/tasks/generate/airflow_dags.py b/dbt_coves/tasks/generate/airflow_dags.py index d06260fb..fdbec316 100644 --- a/dbt_coves/tasks/generate/airflow_dags.py +++ b/dbt_coves/tasks/generate/airflow_dags.py @@ -13,7 +13,7 @@ from dbt_coves.core.exceptions import MissingArgumentException from dbt_coves.tasks.base import NonDbtBaseTask -from dbt_coves.utils.secrets import load_secret_manager_data +from dbt_coves.utils.secrets import load_secret_manager_data, replace_secrets from dbt_coves.utils.tracking import trackable from dbt_coves.utils.yaml import deep_merge @@ -267,10 +267,13 @@ def build_dag_file(self, destination_path: Path, dag_name: str, yml_dag: Dict[st console.print(f"DAG {dag_name} resulted in an invalid DAG, skipping. Error: {exc}") def _merge_secret_nodes(self, secret_nodes, yml_dag) -> Dict[str, Any]: - for node_name, node_config in secret_nodes.get("nodes", {}).items(): - yml_node = yml_dag.get("nodes", {}).get(node_name) - if yml_node: - yml_dag["nodes"][node_name] = deep_merge(node_config, yml_node) + if isinstance(secret_nodes, dict): + for node_name, node_config in secret_nodes.get("nodes", {}).items(): + yml_node = yml_dag.get("nodes", {}).get(node_name) + if yml_node: + yml_dag["nodes"][node_name] = deep_merge(node_config, yml_node) + elif isinstance(secret_nodes, list): # Datacoves secrets + replace_secrets(secret_nodes, yml_dag) return yml_dag def _discover_secrets(self, yml_dag: Dict[str, Any]): @@ -283,9 +286,9 @@ def _discover_secrets(self, yml_dag: Dict[str, Any]): yml_dag = self._merge_secret_nodes(secret_data, yml_dag) if self.secrets_manager: - secret_data = load_secret_manager_data(self) - for secret in secret_data: - yml_dag = self._merge_secret_nodes(secret.get("value", {}), yml_dag) + self.secret_data = load_secret_manager_data(self) + if self.secret_data: + yml_dag = self._merge_secret_nodes(self.secret_data, yml_dag) return yml_dag @@ -321,6 +324,8 @@ def _merge_generator_configs(self, tg_conf: Dict[str, Any], generator: str) -> D """ generators_params = self.get_config_value("generators_params") coves_config_generators_params = generators_params.get(generator, {}) + if self.secrets_manager: + replace_secrets(self.secret_data, coves_config_generators_params) return deep_merge(tg_conf, coves_config_generators_params) def generate_task_group(self, tg_name: str, tg_conf: Dict[str, Any]): diff --git a/dbt_coves/utils/flags.py b/dbt_coves/utils/flags.py index 911a8d47..8f278304 100644 --- a/dbt_coves/utils/flags.py +++ b/dbt_coves/utils/flags.py @@ -84,7 +84,7 @@ def __init__(self, cli_parser: ArgumentParser) -> None: "secrets_manager": None, "secrets_url": None, "secrets_token": None, - "secrets_project": None, + "secrets_environment": None, "secrets_tags": None, "secrets_key": None, }, @@ -111,7 +111,7 @@ def __init__(self, cli_parser: ArgumentParser) -> None: "secrets_manager": None, "secrets_url": None, "secrets_token": None, - "secrets_project": None, + "secrets_environment": None, "secrets_tags": None, "secrets_key": None, }, @@ -124,7 +124,7 @@ def __init__(self, cli_parser: ArgumentParser) -> None: "secrets_manager": None, "secrets_url": None, "secrets_token": None, - "secrets_project": None, + "secrets_environment": None, "secrets_tags": None, "secrets_key": None, }, @@ -309,8 +309,10 @@ def parse_args(self, cli_args: List[str] = list()) -> None: self.generate["airflow_dags"]["secrets_url"] = self.args.secrets_url if self.args.secrets_token: self.generate["airflow_dags"]["secrets_token"] = self.args.secrets_token - if self.args.secrets_project: - self.generate["airflow_dags"]["secrets_project"] = self.args.secrets_project + if self.args.secrets_environment: + self.generate["airflow_dags"][ + "secrets_environment" + ] = self.args.secrets_environment if self.args.secrets_tags: self.generate["airflow_dags"]["secrets_tags"] = self.args.secrets_tags if self.args.secrets_key: @@ -332,8 +334,8 @@ def parse_args(self, cli_args: List[str] = list()) -> None: self.load["airbyte"]["secrets_url"] = self.args.secrets_url if self.args.secrets_token: self.load["airbyte"]["secrets_token"] = self.args.secrets_token - if self.args.secrets_project: - self.load["airbyte"]["secrets_project"] = self.args.secrets_project + if self.args.secrets_environment: + self.load["airbyte"]["secrets_environment"] = self.args.secrets_environment if self.args.secrets_tags: self.load["airbyte"]["secrets_tags"] = [ tag.strip() for tag in self.args.secrets_tags.split(",") @@ -359,8 +361,8 @@ def parse_args(self, cli_args: List[str] = list()) -> None: self.load["fivetran"]["secrets_url"] = self.args.secrets_url if self.args.secrets_token: self.load["fivetran"]["secrets_token"] = self.args.secrets_token - if self.args.secrets_project: - self.load["fivetran"]["secrets_project"] = self.args.secrets_project + if self.args.secrets_environment: + self.load["fivetran"]["secrets_environment"] = self.args.secrets_environment if self.args.secrets_tags: self.load["fivetran"]["secrets_tags"] = [ tag.strip() for tag in self.args.secrets_tags.split(",") diff --git a/dbt_coves/utils/secrets.py b/dbt_coves/utils/secrets.py index d0ededde..432fef6a 100644 --- a/dbt_coves/utils/secrets.py +++ b/dbt_coves/utils/secrets.py @@ -1,13 +1,17 @@ import os +import re import requests from dbt_coves.core.exceptions import DbtCovesException +SECRET_PATTERN = re.compile(r"\{\{\s*secret\('([^']+)'\)\s*\}\}", re.IGNORECASE) + def load_secret_manager_data(task_instance) -> dict: payload = {} manager = task_instance.secrets_manager.lower() + # breakpoint() if manager == "datacoves": # Contact the secrets manager and retrieve Secrets secrets_url = os.getenv("DATACOVES__SECRETS_URL") or task_instance.get_config_value( @@ -16,17 +20,16 @@ def load_secret_manager_data(task_instance) -> dict: secrets_token = os.getenv("DATACOVES__SECRETS_TOKEN") or task_instance.get_config_value( "secrets_token" ) - secrets_project = os.getenv("DATACOVES__SECRETS_PROJECT") or task_instance.get_config_value( - "secrets_project" - ) - - if not (secrets_url and secrets_token and secrets_project): + secrets_environment = os.getenv( + "DATACOVES__ENVIRONMENT_SLUG" + ) or task_instance.get_config_value("secrets_environment") + if not (secrets_url and secrets_token and secrets_environment): raise DbtCovesException( - "[b]secrets_url[/b], [b]secrets_project[/b] and [b]secrets_token[/b] must " + "[b]secrets_url[/b], [b]secrets_environment[/b] and [b]secrets_environment[/b] must " "be provided when using a Secrets Manager" ) - secrets_url = f"{secrets_url}/api/v1/secrets/{secrets_project}" + secrets_url = f"{secrets_url}/api/v1/secrets/{secrets_environment}" secrets_tags = task_instance.get_config_value("secrets_tags") secrets_key = task_instance.get_config_value("secrets_key") if secrets_tags: @@ -42,3 +45,19 @@ def load_secret_manager_data(task_instance) -> dict: return response.json() raise DbtCovesException(f"'{manager}' not recognized as a valid secrets manager.") + + +def replace_secrets(secrets_list, dictionary): + for key, value in dictionary.items(): + if isinstance(value, dict): + replace_secrets(secrets_list, value) + elif isinstance(value, str) and SECRET_PATTERN.search(value): + secret_found = False + for secret in secrets_list: + if secret.get("slug", "").lower() == SECRET_PATTERN.search(value).group(1).lower(): + secret_found = True + dictionary[key] = secret.get("value") + if not secret_found: + raise DbtCovesException( + f"Secret {SECRET_PATTERN.search(value).group(1)} not found in secrets" + ) From c6e8264148e04bf6094f24b520943641fd6d08f5 Mon Sep 17 00:00:00 2001 From: Bruno Antonellini Date: Tue, 22 Oct 2024 14:34:59 -0300 Subject: [PATCH 2/6] Change flag to secrets-environment --- dbt_coves/tasks/generate/airflow_dags.py | 2 +- dbt_coves/tasks/load/airbyte.py | 2 +- dbt_coves/tasks/load/fivetran.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbt_coves/tasks/generate/airflow_dags.py b/dbt_coves/tasks/generate/airflow_dags.py index fdbec316..bad4920c 100644 --- a/dbt_coves/tasks/generate/airflow_dags.py +++ b/dbt_coves/tasks/generate/airflow_dags.py @@ -94,7 +94,7 @@ def register_parser(cls, sub_parsers, base_subparser): subparser.add_argument( "--secrets-token", type=str, help="Secret credentials provider token" ) - subparser.add_argument("--secrets-project", type=str, help="Secret credentials project") + subparser.add_argument("--secrets-environment", type=str, help="Secret credentials project") subparser.add_argument("--secrets-tags", type=str, help="Secret credentials tags") subparser.add_argument("--secrets-key", type=str, help="Secret credentials key") diff --git a/dbt_coves/tasks/load/airbyte.py b/dbt_coves/tasks/load/airbyte.py index c985d75f..32e89da7 100644 --- a/dbt_coves/tasks/load/airbyte.py +++ b/dbt_coves/tasks/load/airbyte.py @@ -63,7 +63,7 @@ def register_parser(cls, sub_parsers, base_subparser): subparser.add_argument( "--secrets-token", type=str, help="Secret credentials provider token" ) - subparser.add_argument("--secrets-project", type=str, help="Secret credentials project") + subparser.add_argument("--secrets-environment", type=str, help="Secret credentials project") subparser.add_argument("--secrets-tags", type=str, help="Secret credentials tags") subparser.add_argument("--secrets-key", type=str, help="Secret credentials key") subparser.set_defaults(cls=cls, which="airbyte") diff --git a/dbt_coves/tasks/load/fivetran.py b/dbt_coves/tasks/load/fivetran.py index 7c249bda..ade659f2 100644 --- a/dbt_coves/tasks/load/fivetran.py +++ b/dbt_coves/tasks/load/fivetran.py @@ -57,7 +57,7 @@ def register_parser(cls, sub_parsers, base_subparser): subparser.add_argument( "--secrets-token", type=str, help="Secret credentials provider token" ) - subparser.add_argument("--secrets-project", type=str, help="Secret credentials project") + subparser.add_argument("--secrets-environment", type=str, help="Secret credentials project") subparser.add_argument("--secrets-tags", type=str, help="Secret credentials tags") subparser.add_argument("--secrets-key", type=str, help="Secret credentials key") subparser.set_defaults(cls=cls, which="fivetran") From c5b0dcf5df5024705f1783ccec598d8acbda0737 Mon Sep 17 00:00:00 2001 From: Bruno Antonellini Date: Tue, 22 Oct 2024 15:44:53 -0300 Subject: [PATCH 3/6] Make secrets case sensitive, output when unmatching case is found --- dbt_coves/utils/secrets.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/dbt_coves/utils/secrets.py b/dbt_coves/utils/secrets.py index 432fef6a..51eb0ecb 100644 --- a/dbt_coves/utils/secrets.py +++ b/dbt_coves/utils/secrets.py @@ -51,13 +51,22 @@ def replace_secrets(secrets_list, dictionary): for key, value in dictionary.items(): if isinstance(value, dict): replace_secrets(secrets_list, value) - elif isinstance(value, str) and SECRET_PATTERN.search(value): - secret_found = False - for secret in secrets_list: - if secret.get("slug", "").lower() == SECRET_PATTERN.search(value).group(1).lower(): - secret_found = True - dictionary[key] = secret.get("value") - if not secret_found: - raise DbtCovesException( - f"Secret {SECRET_PATTERN.search(value).group(1)} not found in secrets" - ) + elif isinstance(value, str): + value_secret = SECRET_PATTERN.search(value) + if value_secret: + secret_key = value_secret.group(1) + secret_found = False + for secret in secrets_list: + # fivetran_api_key + # datacoves X + if secret.get("slug", "").lower() == secret_key.lower(): + if secret.get("slug", "") == secret_key: + secret_found = True + dictionary[key] = secret.get("value") + else: + raise DbtCovesException( + f"Secret [red]{secret_key}[/red] not found in secrets, " + f"found case-unmatching [red]{secret.get('slug')}[/red]" + ) + if not secret_found: + raise DbtCovesException(f"Secret {secret_key} not found in secrets") From d2144e5f1fb8d3b4b881692f5055948624695cfa Mon Sep 17 00:00:00 2001 From: Bruno Antonellini Date: Tue, 22 Oct 2024 15:51:55 -0300 Subject: [PATCH 4/6] Beautify error message --- dbt_coves/utils/secrets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt_coves/utils/secrets.py b/dbt_coves/utils/secrets.py index 51eb0ecb..a2e72cfd 100644 --- a/dbt_coves/utils/secrets.py +++ b/dbt_coves/utils/secrets.py @@ -69,4 +69,4 @@ def replace_secrets(secrets_list, dictionary): f"found case-unmatching [red]{secret.get('slug')}[/red]" ) if not secret_found: - raise DbtCovesException(f"Secret {secret_key} not found in secrets") + raise DbtCovesException(f"Secret [red]{secret_key}[/red] not found in secrets") From 65315f506cd7562429d7ce30adf8ba1aa3d56d4b Mon Sep 17 00:00:00 2001 From: Bruno Antonellini Date: Thu, 24 Oct 2024 17:19:01 -0300 Subject: [PATCH 5/6] Make unmatching/missing secrets fail only once --- dbt_coves/utils/secrets.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/dbt_coves/utils/secrets.py b/dbt_coves/utils/secrets.py index a2e72cfd..84aa611c 100644 --- a/dbt_coves/utils/secrets.py +++ b/dbt_coves/utils/secrets.py @@ -11,7 +11,6 @@ def load_secret_manager_data(task_instance) -> dict: payload = {} manager = task_instance.secrets_manager.lower() - # breakpoint() if manager == "datacoves": # Contact the secrets manager and retrieve Secrets secrets_url = os.getenv("DATACOVES__SECRETS_URL") or task_instance.get_config_value( @@ -47,10 +46,10 @@ def load_secret_manager_data(task_instance) -> dict: raise DbtCovesException(f"'{manager}' not recognized as a valid secrets manager.") -def replace_secrets(secrets_list, dictionary): +def replace_secrets(secrets_list, dictionary, errors: set = set()): for key, value in dictionary.items(): if isinstance(value, dict): - replace_secrets(secrets_list, value) + replace_secrets(secrets_list, value, errors) elif isinstance(value, str): value_secret = SECRET_PATTERN.search(value) if value_secret: @@ -60,13 +59,17 @@ def replace_secrets(secrets_list, dictionary): # fivetran_api_key # datacoves X if secret.get("slug", "").lower() == secret_key.lower(): + secret_found = True if secret.get("slug", "") == secret_key: - secret_found = True dictionary[key] = secret.get("value") else: - raise DbtCovesException( + errors.add( f"Secret [red]{secret_key}[/red] not found in secrets, " - f"found case-unmatching [red]{secret.get('slug')}[/red]" + f"did you mean [red]{secret.get('slug')}[/red]" ) if not secret_found: - raise DbtCovesException(f"Secret [red]{secret_key}[/red] not found in secrets") + errors.add(f"Secret [red]{secret_key}[/red] not found in secrets") + if errors: + error_message = "Errors found:\n" + error_message += "\n".join(errors) + raise DbtCovesException(error_message) From 84aab41cefdbe3d30837cd98ea294d6637259f2d Mon Sep 17 00:00:00 2001 From: Bruno Antonellini Date: Thu, 24 Oct 2024 17:19:57 -0300 Subject: [PATCH 6/6] Remove comments --- dbt_coves/utils/secrets.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbt_coves/utils/secrets.py b/dbt_coves/utils/secrets.py index 84aa611c..deff5aa4 100644 --- a/dbt_coves/utils/secrets.py +++ b/dbt_coves/utils/secrets.py @@ -56,8 +56,6 @@ def replace_secrets(secrets_list, dictionary, errors: set = set()): secret_key = value_secret.group(1) secret_found = False for secret in secrets_list: - # fivetran_api_key - # datacoves X if secret.get("slug", "").lower() == secret_key.lower(): secret_found = True if secret.get("slug", "") == secret_key: