diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 3de51fcc5b6789..ee841a2a201863 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -174,6 +174,8 @@ from datahub.utilities.stats_collections import TopKDict from datahub.utilities.urns.dataset_urn import DatasetUrn +DEFAULT_PAGE_SIZE = 10 + try: # On earlier versions of the tableauserverclient, the NonXMLResponseError # was thrown when reauthentication was necessary. We'll keep both exceptions @@ -342,11 +344,140 @@ class PermissionIngestionConfig(ConfigModel): ) +class TableauPageSizeConfig(ConfigModel): + """ + Configuration for setting page sizes for different Tableau metadata objects. + + Some considerations: + - All have default values, so no setting is mandatory. + - In general, with the `effective_` methods, if not specifically set fine-grained metrics fallback to `page_size` + or correlate with `page_size`. + + Measuring the impact of changing these values can be done by looking at the + `num_(filter_|paginated_)?queries_by_connection_type` metrics in the report. + """ + + page_size: int = Field( + default=DEFAULT_PAGE_SIZE, + description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.", + ) + + database_server_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of database servers to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_database_server_page_size(self) -> int: + return self.database_server_page_size or self.page_size + + # We've found that even with a small workbook page size (e.g. 10), the Tableau API often + # returns warnings like this: + # { + # 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.', + # 'extensions': { + # 'severity': 'WARNING', + # 'code': 'NODE_LIMIT_EXCEEDED', + # 'properties': { + # 'nodeLimit': 20000 + # } + # } + # } + # Reducing the page size for the workbook queries helps to avoid this. + workbook_page_size: Optional[int] = Field( + default=1, + description="[advanced] Number of workbooks to query at a time using the Tableau API; defaults to `1` and fallbacks to `page_size` if not set.", + ) + + @property + def effective_workbook_page_size(self) -> int: + return self.workbook_page_size or self.page_size + + sheet_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of sheets to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_sheet_page_size(self) -> int: + return self.sheet_page_size or self.page_size + + dashboard_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of dashboards to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_dashboard_page_size(self) -> int: + return self.dashboard_page_size or self.page_size + + embedded_datasource_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of embedded datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_embedded_datasource_page_size(self) -> int: + return self.embedded_datasource_page_size or self.page_size + + # Since the field upstream query was separated from the embedded datasource queries into an independent query, + # the number of queries increased significantly and so the execution time. + # To increase the batching and so reduce the number of queries, we can increase the page size for that + # particular case. + # + # That's why unless specifically set, we will effectively use 10 times the page size as the default page size. + embedded_datasource_field_upstream_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of upstream fields to query at a time for embedded datasources using the Tableau API; fallbacks to `page_size` * 10 if not set.", + ) + + @property + def effective_embedded_datasource_field_upstream_page_size(self) -> int: + return self.embedded_datasource_field_upstream_page_size or self.page_size * 10 + + published_datasource_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of published datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_published_datasource_page_size(self) -> int: + return self.published_datasource_page_size or self.page_size + + published_datasource_field_upstream_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of upstream fields to query at a time for published datasources using the Tableau API; fallbacks to `page_size` * 10 if not set.", + ) + + @property + def effective_published_datasource_field_upstream_page_size(self) -> int: + return self.published_datasource_field_upstream_page_size or self.page_size * 10 + + custom_sql_table_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of custom sql datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_custom_sql_table_page_size(self) -> int: + return self.custom_sql_table_page_size or self.page_size + + database_table_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of database tables to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_database_table_page_size(self) -> int: + return self.database_table_page_size or self.page_size + + class TableauConfig( DatasetLineageProviderConfigBase, StatefulIngestionConfigBase, DatasetSourceConfigMixin, TableauConnectionConfig, + TableauPageSizeConfig, ): projects: Optional[List[str]] = Field( default=["default"], @@ -396,40 +527,6 @@ class TableauConfig( description="Ingest details for tables external to (not embedded in) tableau as entities.", ) - page_size: int = Field( - default=10, - description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.", - ) - - # We've found that even with a small workbook page size (e.g. 10), the Tableau API often - # returns warnings like this: - # { - # 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.', - # 'extensions': { - # 'severity': 'WARNING', - # 'code': 'NODE_LIMIT_EXCEEDED', - # 'properties': { - # 'nodeLimit': 20000 - # } - # } - # } - # Reducing the page size for the workbook queries helps to avoid this. - workbook_page_size: int = Field( - default=1, - description="[advanced] Number of workbooks to query at a time using the Tableau API.", - ) - - # Since the field upstream query was separated from the embedded datasource queries into an independent query, - # the number of queries increased significantly and so the execution time. - # To increase the batching and so reduce the number of queries, we can increase the page size for that - # particular case. - # - # `num_(filter_|paginated_)?queries_by_connection_type` metrics in the report will help to understand the impact of this change. - embedded_datasource_field_upstream_page_size: int = Field( - default=100, - description="[advanced] Number of upstream fields to query at a time for embedded datasources using the Tableau API.", - ) - env: str = Field( default=builder.DEFAULT_ENV, description="Environment to use in namespace when constructing URNs.", @@ -1022,7 +1119,9 @@ def maybe_parse_hostname(): return server_connection for database_server in self.get_connection_objects( - database_servers_graphql_query, c.DATABASE_SERVERS_CONNECTION + query=database_servers_graphql_query, + connection_type=c.DATABASE_SERVERS_CONNECTION, + page_size=self.config.effective_database_server_page_size, ): database_server_id = database_server.get(c.ID) server_connection = database_server.get(c.HOST_NAME) @@ -1448,14 +1547,13 @@ def get_connection_objects( self, query: str, connection_type: str, + page_size: int, query_filter: dict = {}, - page_size_override: Optional[int] = None, ) -> Iterable[dict]: query_filter = optimize_query_filter(query_filter) # Calls the get_connection_object_page function to get the objects, # and automatically handles pagination. - page_size = page_size_override or self.config.page_size filter_pages = get_filter_pages(query_filter, page_size) self.report.num_queries_by_connection_type[connection_type] += 1 @@ -1500,10 +1598,10 @@ def emit_workbooks(self) -> Iterable[MetadataWorkUnit]: projects = {c.PROJECT_NAME_WITH_IN: project_names} for workbook in self.get_connection_objects( - workbook_graphql_query, - c.WORKBOOKS_CONNECTION, - projects, - page_size_override=self.config.workbook_page_size, + query=workbook_graphql_query, + connection_type=c.WORKBOOKS_CONNECTION, + query_filter=projects, + page_size=self.config.effective_workbook_page_size, ): # This check is needed as we are using projectNameWithin which return project as per project name so if # user want to ingest only nested project C from A->B->C then tableau might return more than one Project @@ -1958,9 +2056,10 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]: custom_sql_connection = list( self.get_connection_objects( - custom_sql_graphql_query, - c.CUSTOM_SQL_TABLE_CONNECTION, - custom_sql_filter, + query=custom_sql_graphql_query, + connection_type=c.CUSTOM_SQL_TABLE_CONNECTION, + query_filter=custom_sql_filter, + page_size=self.config.effective_custom_sql_table_page_size, ) ) @@ -2669,7 +2768,7 @@ def update_datasource_for_field_upstream( self, datasource: dict, field_upstream_query: str, - page_size_override: Optional[int] = None, + page_size: int, ) -> dict: # Collect field ids to fetch field upstreams field_ids: List[str] = [] @@ -2683,7 +2782,7 @@ def update_datasource_for_field_upstream( query=field_upstream_query, connection_type=c.FIELDS_CONNECTION, query_filter={c.ID_WITH_IN: field_ids}, - page_size_override=page_size_override, + page_size=page_size, ): if field_upstream.get(c.ID): field_id = field_upstream[c.ID] @@ -2706,13 +2805,15 @@ def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]: datasource_filter = {c.ID_WITH_IN: self.datasource_ids_being_used} for datasource in self.get_connection_objects( - published_datasource_graphql_query, - c.PUBLISHED_DATA_SOURCES_CONNECTION, - datasource_filter, + query=published_datasource_graphql_query, + connection_type=c.PUBLISHED_DATA_SOURCES_CONNECTION, + query_filter=datasource_filter, + page_size=self.config.effective_published_datasource_page_size, ): datasource = self.update_datasource_for_field_upstream( datasource=datasource, field_upstream_query=datasource_upstream_fields_graphql_query, + page_size=self.config.effective_published_datasource_field_upstream_page_size, ) yield from self.emit_datasource(datasource) @@ -2730,9 +2831,10 @@ def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]: # Emitting tables that came from Tableau metadata for tableau_table in self.get_connection_objects( - database_tables_graphql_query, - c.DATABASE_TABLES_CONNECTION, - tables_filter, + query=database_tables_graphql_query, + connection_type=c.DATABASE_TABLES_CONNECTION, + query_filter=tables_filter, + page_size=self.config.effective_database_table_page_size, ): database_table = self.database_tables[ tableau_database_table_id_to_urn_map[tableau_table[c.ID]] @@ -2921,9 +3023,10 @@ def emit_sheets(self) -> Iterable[MetadataWorkUnit]: sheets_filter = {c.ID_WITH_IN: self.sheet_ids} for sheet in self.get_connection_objects( - sheet_graphql_query, - c.SHEETS_CONNECTION, - sheets_filter, + query=sheet_graphql_query, + connection_type=c.SHEETS_CONNECTION, + query_filter=sheets_filter, + page_size=self.config.effective_sheet_page_size, ): if self.config.ingest_hidden_assets or not self._is_hidden_view(sheet): yield from self.emit_sheets_as_charts(sheet, sheet.get(c.WORKBOOK)) @@ -3241,9 +3344,10 @@ def emit_dashboards(self) -> Iterable[MetadataWorkUnit]: dashboards_filter = {c.ID_WITH_IN: self.dashboard_ids} for dashboard in self.get_connection_objects( - dashboard_graphql_query, - c.DASHBOARDS_CONNECTION, - dashboards_filter, + query=dashboard_graphql_query, + connection_type=c.DASHBOARDS_CONNECTION, + query_filter=dashboards_filter, + page_size=self.config.effective_dashboard_page_size, ): if self.config.ingest_hidden_assets or not self._is_hidden_view(dashboard): yield from self.emit_dashboard(dashboard, dashboard.get(c.WORKBOOK)) @@ -3388,14 +3492,15 @@ def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]: datasource_filter = {c.ID_WITH_IN: self.embedded_datasource_ids_being_used} for datasource in self.get_connection_objects( - embedded_datasource_graphql_query, - c.EMBEDDED_DATA_SOURCES_CONNECTION, - datasource_filter, + query=embedded_datasource_graphql_query, + connection_type=c.EMBEDDED_DATA_SOURCES_CONNECTION, + query_filter=datasource_filter, + page_size=self.config.effective_embedded_datasource_page_size, ): datasource = self.update_datasource_for_field_upstream( datasource=datasource, field_upstream_query=datasource_upstream_fields_graphql_query, - page_size_override=self.config.embedded_datasource_field_upstream_page_size, + page_size=self.config.effective_embedded_datasource_field_upstream_page_size, ) yield from self.emit_datasource( datasource, diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index 417496c559c436..32b1ef2ed1f835 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -69,8 +69,7 @@ "projects": ["default", "Project 2", "Samples"], "extract_project_hierarchy": False, "page_size": 1000, - "workbook_page_size": 1000, - "embedded_datasource_field_upstream_page_size": 1000, + "workbook_page_size": None, "ingest_tags": True, "ingest_owner": True, "ingest_tables_external": True, @@ -646,8 +645,7 @@ def test_tableau_ingest_with_platform_instance( "platform_instance": "acryl_site1", "projects": ["default", "Project 2"], "page_size": 1000, - "workbook_page_size": 1000, - "embedded_datasource_field_upstream_page_size": 1000, + "workbook_page_size": None, "ingest_tags": True, "ingest_owner": True, "ingest_tables_external": True, diff --git a/metadata-ingestion/tests/unit/test_tableau_source.py b/metadata-ingestion/tests/unit/test_tableau_source.py index 6763b80b28b04c..ba5e5a9832a62f 100644 --- a/metadata-ingestion/tests/unit/test_tableau_source.py +++ b/metadata-ingestion/tests/unit/test_tableau_source.py @@ -3,7 +3,11 @@ import pytest import datahub.ingestion.source.tableau.tableau_constant as c -from datahub.ingestion.source.tableau.tableau import TableauSiteSource +from datahub.ingestion.source.tableau.tableau import ( + DEFAULT_PAGE_SIZE, + TableauPageSizeConfig, + TableauSiteSource, +) from datahub.ingestion.source.tableau.tableau_common import ( TableauUpstreamReference, get_filter_pages, @@ -276,3 +280,100 @@ def test_tableau_upstream_reference(): ) except ValueError: assert True + + +class TestTableauPageSizeConfig: + def test_defaults(self): + config = TableauPageSizeConfig() + assert config.effective_database_server_page_size == DEFAULT_PAGE_SIZE + assert config.effective_workbook_page_size == 1 + assert config.effective_sheet_page_size == DEFAULT_PAGE_SIZE + assert config.effective_dashboard_page_size == DEFAULT_PAGE_SIZE + assert config.effective_embedded_datasource_page_size == DEFAULT_PAGE_SIZE + assert ( + config.effective_embedded_datasource_field_upstream_page_size + == DEFAULT_PAGE_SIZE * 10 + ) + assert config.effective_published_datasource_page_size == DEFAULT_PAGE_SIZE + assert ( + config.effective_published_datasource_field_upstream_page_size + == DEFAULT_PAGE_SIZE * 10 + ) + assert config.effective_custom_sql_table_page_size == DEFAULT_PAGE_SIZE + assert config.effective_database_table_page_size == DEFAULT_PAGE_SIZE + + def test_page_size_fallbacks(self): + page_size = 33 + config = TableauPageSizeConfig(page_size=page_size) + assert config.effective_database_server_page_size == page_size + assert config.effective_workbook_page_size == 1 + assert config.effective_sheet_page_size == page_size + assert config.effective_dashboard_page_size == page_size + assert config.effective_embedded_datasource_page_size == page_size + assert ( + config.effective_embedded_datasource_field_upstream_page_size + == page_size * 10 + ) + assert config.effective_published_datasource_page_size == page_size + assert ( + config.effective_published_datasource_field_upstream_page_size + == page_size * 10 + ) + assert config.effective_custom_sql_table_page_size == page_size + assert config.effective_database_table_page_size == page_size + + def test_fine_grained(self): + any_page_size = 55 + config = TableauPageSizeConfig(database_server_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_database_server_page_size == any_page_size + + config = TableauPageSizeConfig(workbook_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_workbook_page_size == any_page_size + + config = TableauPageSizeConfig(workbook_page_size=None) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_workbook_page_size == DEFAULT_PAGE_SIZE + + config = TableauPageSizeConfig(sheet_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_sheet_page_size == any_page_size + + config = TableauPageSizeConfig(dashboard_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_dashboard_page_size == any_page_size + + config = TableauPageSizeConfig(embedded_datasource_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_embedded_datasource_page_size == any_page_size + + config = TableauPageSizeConfig( + embedded_datasource_field_upstream_page_size=any_page_size + ) + assert config.page_size == DEFAULT_PAGE_SIZE + assert ( + config.effective_embedded_datasource_field_upstream_page_size + == any_page_size + ) + + config = TableauPageSizeConfig(published_datasource_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_published_datasource_page_size == any_page_size + + config = TableauPageSizeConfig( + published_datasource_field_upstream_page_size=any_page_size + ) + assert config.page_size == DEFAULT_PAGE_SIZE + assert ( + config.effective_published_datasource_field_upstream_page_size + == any_page_size + ) + + config = TableauPageSizeConfig(custom_sql_table_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_custom_sql_table_page_size == any_page_size + + config = TableauPageSizeConfig(database_table_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_database_table_page_size == any_page_size