Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from datahub-project:master #1083

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 167 additions & 62 deletions metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@
from datahub.utilities.stats_collections import TopKDict
from datahub.utilities.urns.dataset_urn import DatasetUrn

DEFAULT_PAGE_SIZE = 10

try:
# On earlier versions of the tableauserverclient, the NonXMLResponseError
# was thrown when reauthentication was necessary. We'll keep both exceptions
Expand Down Expand Up @@ -342,11 +344,140 @@ class PermissionIngestionConfig(ConfigModel):
)


class TableauPageSizeConfig(ConfigModel):
"""
Configuration for setting page sizes for different Tableau metadata objects.

Some considerations:
- All have default values, so no setting is mandatory.
- In general, with the `effective_` methods, if not specifically set fine-grained metrics fallback to `page_size`
or correlate with `page_size`.

Measuring the impact of changing these values can be done by looking at the
`num_(filter_|paginated_)?queries_by_connection_type` metrics in the report.
"""

page_size: int = Field(
default=DEFAULT_PAGE_SIZE,
description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.",
)

database_server_page_size: Optional[int] = Field(
default=None,
description="[advanced] Number of database servers to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
)

@property
def effective_database_server_page_size(self) -> int:
return self.database_server_page_size or self.page_size

# We've found that even with a small workbook page size (e.g. 10), the Tableau API often
# returns warnings like this:
# {
# 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.',
# 'extensions': {
# 'severity': 'WARNING',
# 'code': 'NODE_LIMIT_EXCEEDED',
# 'properties': {
# 'nodeLimit': 20000
# }
# }
# }
# Reducing the page size for the workbook queries helps to avoid this.
workbook_page_size: Optional[int] = Field(
default=1,
description="[advanced] Number of workbooks to query at a time using the Tableau API; defaults to `1` and fallbacks to `page_size` if not set.",
)

@property
def effective_workbook_page_size(self) -> int:
return self.workbook_page_size or self.page_size

sheet_page_size: Optional[int] = Field(
default=None,
description="[advanced] Number of sheets to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
)

@property
def effective_sheet_page_size(self) -> int:
return self.sheet_page_size or self.page_size

dashboard_page_size: Optional[int] = Field(
default=None,
description="[advanced] Number of dashboards to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
)

@property
def effective_dashboard_page_size(self) -> int:
return self.dashboard_page_size or self.page_size

embedded_datasource_page_size: Optional[int] = Field(
default=None,
description="[advanced] Number of embedded datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
)

@property
def effective_embedded_datasource_page_size(self) -> int:
return self.embedded_datasource_page_size or self.page_size

# Since the field upstream query was separated from the embedded datasource queries into an independent query,
# the number of queries increased significantly and so the execution time.
# To increase the batching and so reduce the number of queries, we can increase the page size for that
# particular case.
#
# That's why unless specifically set, we will effectively use 10 times the page size as the default page size.
embedded_datasource_field_upstream_page_size: Optional[int] = Field(
default=None,
description="[advanced] Number of upstream fields to query at a time for embedded datasources using the Tableau API; fallbacks to `page_size` * 10 if not set.",
)

@property
def effective_embedded_datasource_field_upstream_page_size(self) -> int:
return self.embedded_datasource_field_upstream_page_size or self.page_size * 10

published_datasource_page_size: Optional[int] = Field(
default=None,
description="[advanced] Number of published datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
)

@property
def effective_published_datasource_page_size(self) -> int:
return self.published_datasource_page_size or self.page_size

published_datasource_field_upstream_page_size: Optional[int] = Field(
default=None,
description="[advanced] Number of upstream fields to query at a time for published datasources using the Tableau API; fallbacks to `page_size` * 10 if not set.",
)

@property
def effective_published_datasource_field_upstream_page_size(self) -> int:
return self.published_datasource_field_upstream_page_size or self.page_size * 10

custom_sql_table_page_size: Optional[int] = Field(
default=None,
description="[advanced] Number of custom sql datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
)

@property
def effective_custom_sql_table_page_size(self) -> int:
return self.custom_sql_table_page_size or self.page_size

database_table_page_size: Optional[int] = Field(
default=None,
description="[advanced] Number of database tables to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
)

@property
def effective_database_table_page_size(self) -> int:
return self.database_table_page_size or self.page_size


class TableauConfig(
DatasetLineageProviderConfigBase,
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
TableauConnectionConfig,
TableauPageSizeConfig,
):
projects: Optional[List[str]] = Field(
default=["default"],
Expand Down Expand Up @@ -396,40 +527,6 @@ class TableauConfig(
description="Ingest details for tables external to (not embedded in) tableau as entities.",
)

page_size: int = Field(
default=10,
description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.",
)

# We've found that even with a small workbook page size (e.g. 10), the Tableau API often
# returns warnings like this:
# {
# 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.',
# 'extensions': {
# 'severity': 'WARNING',
# 'code': 'NODE_LIMIT_EXCEEDED',
# 'properties': {
# 'nodeLimit': 20000
# }
# }
# }
# Reducing the page size for the workbook queries helps to avoid this.
workbook_page_size: int = Field(
default=1,
description="[advanced] Number of workbooks to query at a time using the Tableau API.",
)

# Since the field upstream query was separated from the embedded datasource queries into an independent query,
# the number of queries increased significantly and so the execution time.
# To increase the batching and so reduce the number of queries, we can increase the page size for that
# particular case.
#
# `num_(filter_|paginated_)?queries_by_connection_type` metrics in the report will help to understand the impact of this change.
embedded_datasource_field_upstream_page_size: int = Field(
default=100,
description="[advanced] Number of upstream fields to query at a time for embedded datasources using the Tableau API.",
)

env: str = Field(
default=builder.DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs.",
Expand Down Expand Up @@ -1022,7 +1119,9 @@ def maybe_parse_hostname():
return server_connection

for database_server in self.get_connection_objects(
database_servers_graphql_query, c.DATABASE_SERVERS_CONNECTION
query=database_servers_graphql_query,
connection_type=c.DATABASE_SERVERS_CONNECTION,
page_size=self.config.effective_database_server_page_size,
):
database_server_id = database_server.get(c.ID)
server_connection = database_server.get(c.HOST_NAME)
Expand Down Expand Up @@ -1448,14 +1547,13 @@ def get_connection_objects(
self,
query: str,
connection_type: str,
page_size: int,
query_filter: dict = {},
page_size_override: Optional[int] = None,
) -> Iterable[dict]:
query_filter = optimize_query_filter(query_filter)

# Calls the get_connection_object_page function to get the objects,
# and automatically handles pagination.
page_size = page_size_override or self.config.page_size

filter_pages = get_filter_pages(query_filter, page_size)
self.report.num_queries_by_connection_type[connection_type] += 1
Expand Down Expand Up @@ -1500,10 +1598,10 @@ def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
projects = {c.PROJECT_NAME_WITH_IN: project_names}

for workbook in self.get_connection_objects(
workbook_graphql_query,
c.WORKBOOKS_CONNECTION,
projects,
page_size_override=self.config.workbook_page_size,
query=workbook_graphql_query,
connection_type=c.WORKBOOKS_CONNECTION,
query_filter=projects,
page_size=self.config.effective_workbook_page_size,
):
# This check is needed as we are using projectNameWithin which return project as per project name so if
# user want to ingest only nested project C from A->B->C then tableau might return more than one Project
Expand Down Expand Up @@ -1958,9 +2056,10 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:

custom_sql_connection = list(
self.get_connection_objects(
custom_sql_graphql_query,
c.CUSTOM_SQL_TABLE_CONNECTION,
custom_sql_filter,
query=custom_sql_graphql_query,
connection_type=c.CUSTOM_SQL_TABLE_CONNECTION,
query_filter=custom_sql_filter,
page_size=self.config.effective_custom_sql_table_page_size,
)
)

Expand Down Expand Up @@ -2669,7 +2768,7 @@ def update_datasource_for_field_upstream(
self,
datasource: dict,
field_upstream_query: str,
page_size_override: Optional[int] = None,
page_size: int,
) -> dict:
# Collect field ids to fetch field upstreams
field_ids: List[str] = []
Expand All @@ -2683,7 +2782,7 @@ def update_datasource_for_field_upstream(
query=field_upstream_query,
connection_type=c.FIELDS_CONNECTION,
query_filter={c.ID_WITH_IN: field_ids},
page_size_override=page_size_override,
page_size=page_size,
):
if field_upstream.get(c.ID):
field_id = field_upstream[c.ID]
Expand All @@ -2706,13 +2805,15 @@ def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = {c.ID_WITH_IN: self.datasource_ids_being_used}

for datasource in self.get_connection_objects(
published_datasource_graphql_query,
c.PUBLISHED_DATA_SOURCES_CONNECTION,
datasource_filter,
query=published_datasource_graphql_query,
connection_type=c.PUBLISHED_DATA_SOURCES_CONNECTION,
query_filter=datasource_filter,
page_size=self.config.effective_published_datasource_page_size,
):
datasource = self.update_datasource_for_field_upstream(
datasource=datasource,
field_upstream_query=datasource_upstream_fields_graphql_query,
page_size=self.config.effective_published_datasource_field_upstream_page_size,
)

yield from self.emit_datasource(datasource)
Expand All @@ -2730,9 +2831,10 @@ def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:

# Emitting tables that came from Tableau metadata
for tableau_table in self.get_connection_objects(
database_tables_graphql_query,
c.DATABASE_TABLES_CONNECTION,
tables_filter,
query=database_tables_graphql_query,
connection_type=c.DATABASE_TABLES_CONNECTION,
query_filter=tables_filter,
page_size=self.config.effective_database_table_page_size,
):
database_table = self.database_tables[
tableau_database_table_id_to_urn_map[tableau_table[c.ID]]
Expand Down Expand Up @@ -2921,9 +3023,10 @@ def emit_sheets(self) -> Iterable[MetadataWorkUnit]:
sheets_filter = {c.ID_WITH_IN: self.sheet_ids}

for sheet in self.get_connection_objects(
sheet_graphql_query,
c.SHEETS_CONNECTION,
sheets_filter,
query=sheet_graphql_query,
connection_type=c.SHEETS_CONNECTION,
query_filter=sheets_filter,
page_size=self.config.effective_sheet_page_size,
):
if self.config.ingest_hidden_assets or not self._is_hidden_view(sheet):
yield from self.emit_sheets_as_charts(sheet, sheet.get(c.WORKBOOK))
Expand Down Expand Up @@ -3241,9 +3344,10 @@ def emit_dashboards(self) -> Iterable[MetadataWorkUnit]:
dashboards_filter = {c.ID_WITH_IN: self.dashboard_ids}

for dashboard in self.get_connection_objects(
dashboard_graphql_query,
c.DASHBOARDS_CONNECTION,
dashboards_filter,
query=dashboard_graphql_query,
connection_type=c.DASHBOARDS_CONNECTION,
query_filter=dashboards_filter,
page_size=self.config.effective_dashboard_page_size,
):
if self.config.ingest_hidden_assets or not self._is_hidden_view(dashboard):
yield from self.emit_dashboard(dashboard, dashboard.get(c.WORKBOOK))
Expand Down Expand Up @@ -3388,14 +3492,15 @@ def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = {c.ID_WITH_IN: self.embedded_datasource_ids_being_used}

for datasource in self.get_connection_objects(
embedded_datasource_graphql_query,
c.EMBEDDED_DATA_SOURCES_CONNECTION,
datasource_filter,
query=embedded_datasource_graphql_query,
connection_type=c.EMBEDDED_DATA_SOURCES_CONNECTION,
query_filter=datasource_filter,
page_size=self.config.effective_embedded_datasource_page_size,
):
datasource = self.update_datasource_for_field_upstream(
datasource=datasource,
field_upstream_query=datasource_upstream_fields_graphql_query,
page_size_override=self.config.embedded_datasource_field_upstream_page_size,
page_size=self.config.effective_embedded_datasource_field_upstream_page_size,
)
yield from self.emit_datasource(
datasource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@
"projects": ["default", "Project 2", "Samples"],
"extract_project_hierarchy": False,
"page_size": 1000,
"workbook_page_size": 1000,
"embedded_datasource_field_upstream_page_size": 1000,
"workbook_page_size": None,
"ingest_tags": True,
"ingest_owner": True,
"ingest_tables_external": True,
Expand Down Expand Up @@ -646,8 +645,7 @@ def test_tableau_ingest_with_platform_instance(
"platform_instance": "acryl_site1",
"projects": ["default", "Project 2"],
"page_size": 1000,
"workbook_page_size": 1000,
"embedded_datasource_field_upstream_page_size": 1000,
"workbook_page_size": None,
"ingest_tags": True,
"ingest_owner": True,
"ingest_tables_external": True,
Expand Down
Loading
Loading