Skip to content

Commit

Permalink
feat(ingest/fivetran): protect against high sync volume (datahub-proj…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 18, 2024
1 parent dfd7293 commit 8f7f2c1
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
PlatformDetail,
)
from datahub.ingestion.source.fivetran.data_classes import Connector, Job
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI
from datahub.ingestion.source.fivetran.fivetran_log_api import (
MAX_JOBS_PER_CONNECTOR,
FivetranLogAPI,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
Expand Down Expand Up @@ -72,11 +75,6 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext):

self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)

# Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler.create(
self, self.config, self.ctx
)

def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
input_dataset_urn_list: List[DatasetUrn] = []
output_dataset_urn_list: List[DatasetUrn] = []
Expand Down Expand Up @@ -267,6 +265,13 @@ def _get_connector_workunits(
).as_workunit(is_primary_source=False)

# Map Fivetran's job/sync history entity with Datahub's data process entity
if len(connector.jobs) >= MAX_JOBS_PER_CONNECTOR:
self.report.warning(
title="Not all sync history was captured",
message=f"The connector had more than {MAX_JOBS_PER_CONNECTOR} sync runs in the past {self.config.history_sync_lookback_period} days. "
f"Only the most recent {MAX_JOBS_PER_CONNECTOR} syncs were ingested.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
)
for job in connector.jobs:
dpi = self._generate_dpi_from_job(job, datajob)
yield from self._get_dpi_workunits(job, dpi)
Expand All @@ -279,7 +284,9 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
self.stale_entity_removal_handler.workunit_processor,
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

logger: logging.Logger = logging.getLogger(__name__)

# We don't want to generate a massive number of dataProcesses for a single connector.
# This is primarily used as a safeguard to prevent performance issues.
MAX_JOBS_PER_CONNECTOR = 1000


class FivetranLogAPI:
def __init__(self, fivetran_log_config: FivetranLogConfig) -> None:
Expand Down Expand Up @@ -158,34 +162,32 @@ def _get_table_lineage(

return table_lineage_list

def _get_all_connector_sync_logs(self, syncs_interval: int) -> Dict[str, Dict]:
sync_logs = {}
for row in self._query(
self.fivetran_log_query.get_sync_logs_query().format(
db_clause=self.fivetran_log_query.db_clause,
syncs_interval=syncs_interval,
)
):
if row[Constant.CONNECTOR_ID] not in sync_logs:
sync_logs[row[Constant.CONNECTOR_ID]] = {
row[Constant.SYNC_ID]: {
row["message_event"]: (
row[Constant.TIME_STAMP].timestamp(),
row[Constant.MESSAGE_DATA],
)
}
}
elif row[Constant.SYNC_ID] not in sync_logs[row[Constant.CONNECTOR_ID]]:
sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]] = {
row["message_event"]: (
row[Constant.TIME_STAMP].timestamp(),
row[Constant.MESSAGE_DATA],
)
}
else:
sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]][
row["message_event"]
] = (row[Constant.TIME_STAMP].timestamp(), row[Constant.MESSAGE_DATA])
def _get_all_connector_sync_logs(
self, syncs_interval: int, connector_ids: List[str]
) -> Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]]:
sync_logs: Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]] = {}

# Format connector_ids as a comma-separated string of quoted IDs
formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids)

query = self.fivetran_log_query.get_sync_logs_query().format(
db_clause=self.fivetran_log_query.db_clause,
syncs_interval=syncs_interval,
max_jobs_per_connector=MAX_JOBS_PER_CONNECTOR,
connector_ids=formatted_connector_ids,
)

for row in self._query(query):
connector_id = row[Constant.CONNECTOR_ID]
sync_id = row[Constant.SYNC_ID]

if connector_id not in sync_logs:
sync_logs[connector_id] = {}

sync_logs[connector_id][sync_id] = {
"sync_start": (row["start_time"].timestamp(), None),
"sync_end": (row["end_time"].timestamp(), row["end_message_data"]),
}

return sync_logs

Expand Down Expand Up @@ -244,7 +246,10 @@ def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None:
def _fill_connectors_jobs(
self, connectors: List[Connector], syncs_interval: int
) -> None:
sync_logs = self._get_all_connector_sync_logs(syncs_interval)
connector_ids = [connector.connector_id for connector in connectors]
sync_logs = self._get_all_connector_sync_logs(
syncs_interval, connector_ids=connector_ids
)
for connector in connectors:
connector.jobs = self._get_jobs_list(sync_logs.get(connector.connector_id))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,32 @@ def get_users_query(self) -> str:

def get_sync_logs_query(self) -> str:
return """
SELECT connector_id,
sync_id,
message_event,
message_data,
time_stamp
FROM {db_clause}log
WHERE message_event in ('sync_start', 'sync_end')
and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'"""
WITH ranked_syncs AS (
SELECT
connector_id,
sync_id,
MAX(CASE WHEN message_event = 'sync_start' THEN time_stamp END) as start_time,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time,
MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data,
ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn
FROM {db_clause}log
WHERE message_event in ('sync_start', 'sync_end')
AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'
AND connector_id IN ({connector_ids})
GROUP BY connector_id, sync_id
)
SELECT
connector_id,
sync_id,
start_time,
end_time,
end_message_data
FROM ranked_syncs
WHERE rn <= {max_jobs_per_connector}
AND start_time IS NOT NULL
AND end_time IS NOT NULL
ORDER BY connector_id, end_time DESC
"""

def get_table_lineage_query(self) -> str:
return f"""
Expand Down
58 changes: 13 additions & 45 deletions metadata-ingestion/tests/integration/fivetran/test_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,64 +101,32 @@ def default_query_results(
}
]
elif query == fivetran_log_query.get_sync_logs_query().format(
db_clause=fivetran_log_query.db_clause, syncs_interval=7
db_clause=fivetran_log_query.db_clause,
syncs_interval=7,
max_jobs_per_connector=1000,
connector_ids="'calendar_elected'",
):
return [
{
"connector_id": "calendar_elected",
"sync_id": "4c9a03d6-eded-4422-a46a-163266e58243",
"message_event": "sync_start",
"message_data": None,
"time_stamp": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000),
"start_time": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000),
"end_time": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000),
"end_message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"',
},
{
"connector_id": "calendar_elected",
"sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834",
"message_event": "sync_start",
"message_data": None,
"time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000),
"start_time": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000),
"end_time": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000),
"end_message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"',
},
{
"connector_id": "calendar_elected",
"sync_id": "63c2fc85-600b-455f-9ba0-f576522465be",
"message_event": "sync_start",
"message_data": None,
"time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000),
},
{
"connector_id": "calendar_elected",
"sync_id": "e773e1e9-c791-46f4-894f-8ch9b3dfc832",
"message_event": "sync_start",
"message_data": None,
"time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 5, 403000),
},
{
"connector_id": "calendar_elected",
"sync_id": "4c9a03d6-eded-4422-a46a-163266e58243",
"message_event": "sync_end",
"message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"',
"time_stamp": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000),
},
{
"connector_id": "calendar_elected",
"sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834",
"message_event": "sync_end",
"message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"',
"time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000),
},
{
"connector_id": "calendar_elected",
"sync_id": "63c2fc85-600b-455f-9ba0-f576522465be",
"message_event": "sync_end",
"message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"',
"time_stamp": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000),
},
{
"connector_id": "calendar_elected",
"sync_id": "e773e1e9-c791-46f4-894f-8ch9b3dfc832",
"message_event": "sync_end",
"message_data": None,
"time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 35, 478000),
"start_time": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000),
"end_time": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000),
"end_message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"',
},
]
# Unreachable code
Expand Down

0 comments on commit 8f7f2c1

Please sign in to comment.