From e904f71069ae30ae69c5510c59b6a3adb05ed31d Mon Sep 17 00:00:00 2001 From: Ilia Babanov Date: Thu, 12 Dec 2024 10:24:20 +0100 Subject: [PATCH] Add `Datasets` section with dataset schemas to the pipelines explorer (#1479) ## Changes Initially Datasets aren't loaded, but you can expand them to trigger loading Screenshot 2024-12-05 at 10 14 41 If we can't find dataset definitions in the latests runs we show "please run or validate" item: Screenshot 2024-12-05 at 10 14 58 Screenshot 2024-12-05 at 10 15 14 Running or validating the pipeline will also show 'loading' state for the datasets section: Screenshot 2024-12-05 at 10 28 09 ## Tests Unit and e2e tests --- .../src/bundle/BundlePipelinesManager.test.ts | 127 ++++++++++++++++-- .../src/bundle/BundlePipelinesManager.ts | 101 +++++++++----- .../src/bundle/run/PipelineRunStatus.ts | 36 +++-- packages/databricks-vscode/src/extension.ts | 15 ++- .../test/e2e/deploy_and_run_pipeline.e2e.ts | 69 +++++++++- .../src/test/e2e/utils/dabsExplorerUtils.ts | 32 +++-- .../BundleResourceExplorerTreeDataProvider.ts | 9 +- .../JobRunStatusTreeNode.ts | 5 +- .../PipelineDatasetsTreeNode.ts | 124 +++++++++++++++++ .../PipelineRunStatusTreeNode.ts | 59 +++----- .../PipelineTreeNode.ts | 20 ++- .../ResourceTypeHeaderTreeNode.ts | 7 +- .../TaskRunStatusTreeNode.ts | 4 +- .../src/ui/bundle-resource-explorer/types.ts | 6 +- .../utils/JobRunStateUtils.ts | 48 ------- .../utils/RunStateUtils.ts | 104 +++++++++++++- .../bundle-resource-explorer/utils/index.ts | 1 - 17 files changed, 581 insertions(+), 186 deletions(-) create mode 100644 packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineDatasetsTreeNode.ts delete mode 100644 packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/JobRunStateUtils.ts diff --git a/packages/databricks-vscode/src/bundle/BundlePipelinesManager.test.ts b/packages/databricks-vscode/src/bundle/BundlePipelinesManager.test.ts index 9fab23c18..056840d60 100644 --- a/packages/databricks-vscode/src/bundle/BundlePipelinesManager.test.ts +++ b/packages/databricks-vscode/src/bundle/BundlePipelinesManager.test.ts @@ -54,9 +54,21 @@ describe("BundlePipelinesManager", () => { const firstRun = { data: {creation_time: 10}, events: [ - {origin: {dataset_name: "table1"}}, - {origin: {not_a_dataset_name: "table1.5"}}, - {origin: {dataset_name: "table2"}}, + { + origin: {dataset_name: "table1"}, + details: {dataset_definition: {dataset_type: "TABLE"}}, + }, + { + origin: {not_a_dataset_name: "table1.5"}, + }, + { + origin: {dataset_name: "table2"}, + details: {dataset_definition: {dataset_type: "TABLE"}}, + }, + { + origin: {dataset_name: "table2.5"}, + details: {dataset_definition: {dataset_type: "VIEW"}}, + }, ], }; /* eslint-enable @typescript-eslint/naming-convention */ @@ -77,9 +89,17 @@ describe("BundlePipelinesManager", () => { refresh_selection: ["table3", "table4"], }, events: [ - {origin: {dataset_name: "table3"}}, - {origin: {not_a_dataset_name: "table3.5"}}, - {origin: {dataset_name: "table4"}}, + { + origin: {dataset_name: "table3"}, + details: {dataset_definition: {dataset_type: "TABLE"}}, + }, + { + origin: {not_a_dataset_name: "table3.5"}, + }, + { + origin: {dataset_name: "table4"}, + details: {dataset_definition: {dataset_type: "TABLE"}}, + }, ], }; /* eslint-enable @typescript-eslint/naming-convention */ @@ -103,9 +123,17 @@ describe("BundlePipelinesManager", () => { state: "RUNNING", }, events: [ - {origin: {dataset_name: "table_new"}}, - {origin: {not_a_dataset_name: "not a table"}}, - {origin: {dataset_name: "table_final"}}, + { + origin: {dataset_name: "table_new"}, + details: {dataset_definition: {dataset_type: "TABLE"}}, + }, + { + origin: {not_a_dataset_name: "not a table"}, + }, + { + origin: {dataset_name: "table_final"}, + details: {dataset_definition: {dataset_type: "TABLE"}}, + }, ], }; /* eslint-enable @typescript-eslint/naming-convention */ @@ -126,9 +154,17 @@ describe("BundlePipelinesManager", () => { state: "COMPLETED", }, events: [ - {origin: {dataset_name: "table_new"}}, - {origin: {not_a_dataset_name: "not a table"}}, - {origin: {dataset_name: "table_final"}}, + { + origin: {dataset_name: "table_new"}, + details: {dataset_definition: {dataset_type: "TABLE"}}, + }, + { + origin: {not_a_dataset_name: "not a table"}, + }, + { + origin: {dataset_name: "table_final"}, + details: {dataset_definition: {dataset_type: "TABLE"}}, + }, ], }; /* eslint-enable @typescript-eslint/naming-convention */ @@ -137,12 +173,77 @@ describe("BundlePipelinesManager", () => { await clock.runToLastAsync(); // Only the datasets from the final full-refresh run should be left - datasets = manager.getDatasets("pipeline1"); + datasets = manager.getDatasets("pipelines.pipeline1"); assert.strictEqual(datasets.size, 2); assert(datasets.has("table_new")); assert(datasets.has("table_final")); }); + it("should extract pipeline schemas from run events", async () => { + const remoteState = {resources: {pipelines: {pipeline1: {}}}}; + when(configModel.get("remoteStateConfig")).thenResolve(remoteState); + const runStatuses = new Map(); + when(runStatusManager.runStatuses).thenReturn(runStatuses); + + /* eslint-disable @typescript-eslint/naming-convention */ + const firstRun = { + data: {creation_time: 10}, + events: [ + { + origin: {dataset_name: "table1"}, + details: { + dataset_definition: { + dataset_type: "TABLE", + schema: [ + {name: "col1", data_type: "STRING"}, + {name: "col2", not_a_data_type: "INTEGER"}, + ], + }, + }, + }, + { + origin: {not_a_dataset_name: "table1.5"}, + }, + { + origin: {dataset_name: "table2"}, + details: { + dataset_definition: {dataset_type: "TABLE", schema: []}, + }, + }, + { + origin: {dataset_name: "table3"}, + details: { + dataset_definition: { + dataset_type: "VIEW", + schema: [{name: "col1", data_type: "STRING"}], + }, + }, + }, + ], + }; + /* eslint-enable @typescript-eslint/naming-convention */ + runStatuses.set("pipelines.pipeline1", firstRun); + + eventEmitter.fire(); + await clock.runToLastAsync(); + + const schemas = manager.getSchemas("pipelines.pipeline1"); + assert.strictEqual(schemas.size, 2); + assert.deepStrictEqual(schemas.get("table1"), { + name: "table1", + type: "TABLE", + schema: [ + {name: "col1", type: "STRING"}, + {name: "col2", type: ""}, + ], + }); + assert.deepStrictEqual(schemas.get("table3"), { + name: "table3", + type: "VIEW", + schema: [{name: "col1", type: "STRING"}], + }); + }); + describe("locationToRange", () => { it("should return correct range for a given location in a text file", async () => { const uri = Uri.file("/path/to/file.py"); diff --git a/packages/databricks-vscode/src/bundle/BundlePipelinesManager.ts b/packages/databricks-vscode/src/bundle/BundlePipelinesManager.ts index e24b6cf4c..5f9999784 100644 --- a/packages/databricks-vscode/src/bundle/BundlePipelinesManager.ts +++ b/packages/databricks-vscode/src/bundle/BundlePipelinesManager.ts @@ -40,13 +40,22 @@ type RunState = { events: PipelineEvent[] | undefined; }; -type PipelineState = { - key: string; +export type DatasetWithSchema = { + name: string; + type: string; + schema: Array<{name: string; type: string}>; +}; + +type ResolvedPipelineState = { datasets: Set; - runs: Set; + schemas: Map; }; +type PreloadedPipelineState = Promise; -type PreloadedPipelineState = Promise | undefined>; +type PipelineState = { + key: string; + runs: Set; +} & ResolvedPipelineState; type Pick = QuickPickItem & {isDataset?: boolean}; @@ -78,6 +87,7 @@ export class BundlePipelinesManager { this.configModel.onDidChangeTarget(() => { this.updateTriggeredPipelinesState(); this.updateDiagnostics(); + this.preloadedState.clear(); }), this.configModel.onDidChangeKey("remoteStateConfig")(async () => { this.updateTriggeredPipelinesState(); @@ -104,6 +114,7 @@ export class BundlePipelinesManager { this.triggeredState.set(pipelineKey, { key: pipelineKey, datasets: new Set(), + schemas: new Map(), runs: new Set(), }); } @@ -113,7 +124,9 @@ export class BundlePipelinesManager { ); if (runStatus) { state.runs.add(runStatus as PipelineRunStatus); - state.datasets = extractPipelineDatasets(state.runs); + const extractedData = extractPipelineDatasets(state.runs); + state.datasets = extractedData.datasets; + state.schemas = extractedData.schemas; } }); } @@ -242,10 +255,19 @@ export class BundlePipelinesManager { } public getDatasets(pipelineKey: string) { - return this.triggeredState.get(pipelineKey)?.datasets ?? new Set(); + const key = pipelineKey.split(".")[1] ?? pipelineKey; + return this.triggeredState.get(key)?.datasets ?? new Set(); + } + + public getSchemas(pipelineKey: string) { + const key = pipelineKey.split(".")[1] ?? pipelineKey; + return ( + this.triggeredState.get(key ?? pipelineKey)?.schemas ?? + new Map() + ); } - async preloadDatasets(pipelineKey: string): PreloadedPipelineState { + public async preloadDatasets(pipelineKey: string): PreloadedPipelineState { const remoteState = await this.configModel.get("remoteStateConfig"); if (!remoteState) { return undefined; @@ -267,13 +289,13 @@ export class BundlePipelinesManager { return preloaded; } - const barrier = new Barrier>(); + const barrier = new Barrier(); this.preloadedState.set(pipelineKey, barrier.promise); try { const runs = await this.preloadUpdates(client, pipelineId); if (!runs) { - barrier.resolve(new Set()); + barrier.resolve({datasets: new Set(), schemas: new Map()}); return barrier.promise; } const listing = this.createPreloadEventsRequest( @@ -287,8 +309,10 @@ export class BundlePipelinesManager { runState.events.push(event); } } - const datasets = extractPipelineDatasets(new Set(runs.values())); - barrier.resolve(datasets); + const extractedData = extractPipelineDatasets( + new Set(runs.values()) + ); + barrier.resolve(extractedData); } catch (e) { barrier.reject(e); } @@ -360,9 +384,9 @@ export class BundlePipelinesManager { disposables ); this.preloadDatasets(key) - .then((preloadedDatasets) => { - if (preloadedDatasets && isUIVisible) { - for (const dataset of preloadedDatasets) { + .then((preloadedData) => { + if (preloadedData && isUIVisible) { + for (const dataset of preloadedData.datasets) { knownDatasets.add(dataset); } updateItems(ui, knownDatasets); @@ -418,7 +442,7 @@ async function confirmFullRefresh() { ); } -function isFullGraphUpdate(update?: UpdateInfo) { +export function isFullGraphUpdate(update?: UpdateInfo) { if (!update || update.state !== "COMPLETED") { return false; } @@ -429,37 +453,48 @@ function isFullGraphUpdate(update?: UpdateInfo) { ); } -// "details" is not a publicly documented field -function extractDatasetName( - event: PipelineEvent & {details?: any} -): string | undefined { - if (!event.origin?.dataset_name) { - return; - } - // VIEWs can't be used for a partial refresh (they are always refreshed) - if (event.details?.dataset_definition?.dataset_type === "VIEW") { - return; - } - return event.origin.dataset_name; -} - -function extractPipelineDatasets(runs: Set) { +function extractPipelineDatasets(runs: Set): ResolvedPipelineState { const datasets = new Set(); + const schemas = new Map(); const runsByStartTimeDesc = Array.from(runs).sort( (a, b) => (b.data?.creation_time ?? 0) - (a.data?.creation_time ?? 0) ); for (const run of runsByStartTimeDesc) { for (const event of run.events ?? []) { - const datasetName = extractDatasetName(event); - if (datasetName) { + const datasetName = event.origin?.dataset_name; + // 'details' is not documented, but it's safe to rely on if it exists + // @ts-expect-error Property 'details' does not exist + const definition = event.details?.dataset_definition; + if (!datasetName || !definition) { + continue; + } + const datasetType = definition.dataset_type ?? ""; + if (datasetType && datasetType !== "VIEW") { datasets.add(datasetName); } + if ( + Array.isArray(definition.schema) && + definition.schema.length > 0 + ) { + const schema = definition.schema.map( + // eslint-disable-next-line @typescript-eslint/naming-convention + (field: {name?: string; data_type?: string}) => ({ + name: field.name ?? "", + type: field.data_type ?? "", + }) + ); + schemas.set(datasetName, { + name: datasetName, + type: datasetType, + schema, + }); + } } if (isFullGraphUpdate(run.data)) { break; } } - return datasets; + return {datasets, schemas}; } function createPicks(datasets: Set, manualValue?: string) { diff --git a/packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts b/packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts index d84fdb691..bbf4aeb1d 100644 --- a/packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts +++ b/packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts @@ -29,6 +29,7 @@ export class PipelineRunStatus extends BundleRunStatus { public events: pipelines.PipelineEvent[] | undefined; private logger = logging.NamedLogger.getOrCreate(Loggers.Extension); + private latestEventTimestamp: string | undefined; constructor( private readonly authProvider: AuthProvider, @@ -76,7 +77,12 @@ export class PipelineRunStatus extends BundleRunStatus { if (this.runState !== "running") { return; } - await this.updateRunData(runId); + try { + await this.updateRunData(runId); + } catch (e) { + this.logger.error("Failed to fetch run state:", e); + throw new RetriableError(); + } if (isRunning(this.data?.state)) { throw new RetriableError(); } else { @@ -98,32 +104,38 @@ export class PipelineRunStatus extends BundleRunStatus { }); this.data = getUpdateResponse.update; this.onDidChangeEmitter.fire(); - if (this.data?.creation_time !== undefined) { - this.events = await this.fetchUpdateEvents( + if (this.data?.update_id !== undefined) { + const events = await this.fetchUpdateEvents( client, - this.data?.creation_time, - this.data?.update_id + this.data?.update_id, + this.latestEventTimestamp ); + const latestEvent = events[events.length - 1]; + if (latestEvent?.timestamp !== undefined) { + this.latestEventTimestamp = latestEvent.timestamp; + } + this.events = this.events?.concat(events) ?? events; this.onDidChangeEmitter.fire(); } } private async fetchUpdateEvents( client: WorkspaceClient, - creationTime: number, - updateId?: string + updateId: string | undefined, + latestEventTimestamp: string | undefined ) { const events = []; - const timestamp = new Date(creationTime).toISOString(); + let filter = `update_id = '${updateId}'`; + if (latestEventTimestamp !== undefined) { + filter += ` AND timestamp > '${latestEventTimestamp}'`; + } const listEvents = client.pipelines.listPipelineEvents({ pipeline_id: this.pipelineId, order_by: ["timestamp asc"], - filter: `timestamp >= '${timestamp}'`, + filter, }); for await (const event of listEvents) { - if (!updateId || event.origin?.update_id === updateId) { - events.push(event); - } + events.push(event); } return events; } diff --git a/packages/databricks-vscode/src/extension.ts b/packages/databricks-vscode/src/extension.ts index 364bac462..ffd0c2b47 100644 --- a/packages/databricks-vscode/src/extension.ts +++ b/packages/databricks-vscode/src/extension.ts @@ -626,19 +626,20 @@ export async function activate( configModel, bundleRunTerminalManager ); + const bundlePipelinesManager = new BundlePipelinesManager( + connectionManager, + bundleRunStatusManager, + configModel + ); const bundleResourceExplorerTreeDataProvider = new BundleResourceExplorerTreeDataProvider( + context, configModel, + connectionManager, bundleRunStatusManager, - context, - connectionManager + bundlePipelinesManager ); - const bundlePipelinesManager = new BundlePipelinesManager( - connectionManager, - bundleRunStatusManager, - configModel - ); const bundleCommands = new BundleCommands( bundleRemoteStateModel, bundleRunStatusManager, diff --git a/packages/databricks-vscode/src/test/e2e/deploy_and_run_pipeline.e2e.ts b/packages/databricks-vscode/src/test/e2e/deploy_and_run_pipeline.e2e.ts index cf83a029b..2e3b701b2 100644 --- a/packages/databricks-vscode/src/test/e2e/deploy_and_run_pipeline.e2e.ts +++ b/packages/databricks-vscode/src/test/e2e/deploy_and_run_pipeline.e2e.ts @@ -6,9 +6,10 @@ import { waitForLogin, waitForTreeItems, } from "./utils/commonUtils.ts"; -import {CustomTreeSection, Workbench} from "wdio-vscode-service"; +import {CustomTreeSection, TreeItem, Workbench} from "wdio-vscode-service"; import {createProjectWithPipeline} from "./utils/dabsFixtures.ts"; import { + getResourceSubItems, getResourceViewItem, waitForRunStatus, } from "./utils/dabsExplorerUtils.ts"; @@ -91,4 +92,70 @@ describe("Deploy and run pipeline", async function () { 15 * 60 * 1000 ); }); + + it("should show expected run events for the sucessful run", async () => { + const runStatusItems = await getResourceSubItems( + resourceExplorerView, + "Pipelines", + pipelineName, + "Run Status" + ); + const labels: string[] = []; + for (const item of runStatusItems) { + const label = await item.getLabel(); + console.log(`Run status item: ${label}`); + labels.push(label); + } + assert(labels.includes("Start Time"), "Start Time label not found"); + assert( + labels.includes("Dataset 'test_view' defined as VIEW."), + "test_view item not found" + ); + assert( + labels.includes( + "Dataset 'test_table' defined as MATERIALIZED_VIEW." + ), + "test_table item not found" + ); + }); + + it("should show expected datasets after the successful run", async () => { + const datasetItems = await getResourceSubItems( + resourceExplorerView, + "Pipelines", + pipelineName, + "Datasets" + ); + const datasets: Array<{ + label: string; + description?: string; + item: TreeItem; + }> = []; + for (const item of datasetItems) { + const label = await item.getLabel(); + console.log(`Dataset label: ${label}`); + const description = await item.getDescription(); + console.log(`Dataset description: ${description}`); + datasets.push({label, description, item}); + } + datasets.sort((a, b) => (a.label > b.label ? 1 : -1)); + assert.strictEqual(datasets.length, 2); + assert.strictEqual(datasets[0].label, "test_table"); + assert.strictEqual(datasets[0].description, "materialized view"); + assert.strictEqual(datasets[1].label, "test_view"); + assert.strictEqual(datasets[1].description, "view"); + }); + + it("should show expected schema definitions for a dataset", async () => { + const schemaItems = await getResourceSubItems( + resourceExplorerView, + "Pipelines", + pipelineName, + "Datasets", + "test_table" + ); + assert.strictEqual(schemaItems.length, 1); + assert.strictEqual(await schemaItems[0].getLabel(), "1"); + assert.strictEqual(await schemaItems[0].getDescription(), "integer"); + }); }); diff --git a/packages/databricks-vscode/src/test/e2e/utils/dabsExplorerUtils.ts b/packages/databricks-vscode/src/test/e2e/utils/dabsExplorerUtils.ts index 9c9bca754..22dc9f189 100644 --- a/packages/databricks-vscode/src/test/e2e/utils/dabsExplorerUtils.ts +++ b/packages/databricks-vscode/src/test/e2e/utils/dabsExplorerUtils.ts @@ -19,16 +19,10 @@ export async function geTaskViewItem( resourceName: string, taskName: string ) { - const jobViewItem = await getResourceViewItem( + const tasks = await getResourceSubItems( resourceExplorerView, "Workflows", - resourceName - ); - assert(jobViewItem, `Job view item with name ${resourceName} not found`); - - const tasks = await resourceExplorerView.openItem( - "Workflows", - await (await jobViewItem!.elem).getText(), + resourceName, "Tasks" ); for (const task of tasks) { @@ -38,6 +32,28 @@ export async function geTaskViewItem( } } +export async function getResourceSubItems( + resourceExplorerView: CustomTreeSection, + resourceType: "Workflows" | "Pipelines", + resourceName: string, + ...subItemNames: string[] +) { + const resourceViewItem = await getResourceViewItem( + resourceExplorerView, + resourceType, + resourceName + ); + assert( + resourceViewItem, + `Resource view item with name ${resourceName} not found` + ); + return await resourceExplorerView.openItem( + resourceType, + await (await resourceViewItem!.elem).getText(), + ...subItemNames + ); +} + export async function waitForRunStatus( resourceExplorerView: CustomTreeSection, resourceType: "Workflows" | "Pipelines", diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/BundleResourceExplorerTreeDataProvider.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/BundleResourceExplorerTreeDataProvider.ts index 94a6598e9..23bffcf6b 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/BundleResourceExplorerTreeDataProvider.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/BundleResourceExplorerTreeDataProvider.ts @@ -14,6 +14,7 @@ import {ConfigModel} from "../../configuration/models/ConfigModel"; import {BundleRunStatusManager} from "../../bundle/run/BundleRunStatusManager"; import {ConnectionManager} from "../../configuration/ConnectionManager"; import {ResourceTypeHeaderTreeNode} from "./ResourceTypeHeaderTreeNode"; +import {BundlePipelinesManager} from "../../bundle/BundlePipelinesManager"; export class BundleResourceExplorerTreeDataProvider implements TreeDataProvider @@ -27,10 +28,11 @@ export class BundleResourceExplorerTreeDataProvider > = this._onDidChangeTreeData.event; constructor( + private readonly context: ExtensionContext, private readonly configModel: ConfigModel, + private readonly connectionManager: ConnectionManager, private readonly bundleRunStatusManager: BundleRunStatusManager, - private readonly context: ExtensionContext, - private readonly connectionManager: ConnectionManager + private readonly pipelinesManager: BundlePipelinesManager ) { this.disposables.push( this.configModel.onDidChangeTarget(() => { @@ -60,8 +62,9 @@ export class BundleResourceExplorerTreeDataProvider } return ResourceTypeHeaderTreeNode.getRoots( this.context, - this.bundleRunStatusManager, this.connectionManager, + this.bundleRunStatusManager, + this.pipelinesManager, bundleRemoteState ); } diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/JobRunStatusTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/JobRunStatusTreeNode.ts index 41cdc2713..6716a48a8 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/JobRunStatusTreeNode.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/JobRunStatusTreeNode.ts @@ -3,10 +3,11 @@ import { BundleResourceExplorerTreeNode, } from "./types"; import {TreeItemCollapsibleState} from "vscode"; -import {ContextUtils, JobRunStateUtils, RunStateUtils} from "./utils"; +import {ContextUtils, RunStateUtils} from "./utils"; import {jobs} from "@databricks/databricks-sdk"; import {JobRunStatus} from "../../bundle/run/JobRunStatus"; import {TreeItemTreeNode} from "../TreeItemTreeNode"; +import {getSimplifiedJobRunState} from "./utils/RunStateUtils"; export class JobRunStatusTreeNode implements BundleResourceExplorerTreeNode { readonly type = "job_run_status"; @@ -95,7 +96,7 @@ export class JobRunStatusTreeNode implements BundleResourceExplorerTreeNode { const status = this.runMonitor?.runState === "cancelling" ? "Cancelling" - : JobRunStateUtils.getSimplifiedRunState(this.runDetails); + : getSimplifiedJobRunState(this.runDetails); return { label: "Run Status", diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineDatasetsTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineDatasetsTreeNode.ts new file mode 100644 index 000000000..6d90fc6c1 --- /dev/null +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineDatasetsTreeNode.ts @@ -0,0 +1,124 @@ +import { + BundleResourceExplorerTreeItem, + BundleResourceExplorerTreeNode, +} from "./types"; +import {ThemeIcon, TreeItemCollapsibleState} from "vscode"; +import {ContextUtils} from "./utils"; +import {TreeItemTreeNode} from "../TreeItemTreeNode"; +import { + BundlePipelinesManager, + DatasetWithSchema, + isFullGraphUpdate, +} from "../../bundle/BundlePipelinesManager"; +import {PipelineRunStatus} from "../../bundle/run/PipelineRunStatus"; +import { + getSimplifiedPipelineUpdateState, + isInLoadingState, +} from "./utils/RunStateUtils"; + +class PipelineDatasetTreeNode implements BundleResourceExplorerTreeNode { + readonly type = "pipeline_dataset"; + constructor(private readonly datasetWithSchema: DatasetWithSchema) {} + + getChildren(): BundleResourceExplorerTreeNode[] { + const children: BundleResourceExplorerTreeNode[] = []; + for (const field of this.datasetWithSchema.schema) { + children.push( + new TreeItemTreeNode( + { + label: field.name, + description: toDescription(field.type), + contextValue: "dataset_field", + }, + this + ) + ); + } + return children; + } + + getTreeItem(): BundleResourceExplorerTreeItem { + return { + label: this.datasetWithSchema.name, + description: toDescription(this.datasetWithSchema.type), + contextValue: ContextUtils.getContextString({ + nodeType: this.type, + }), + collapsibleState: + this.datasetWithSchema.schema.length >= 0 + ? TreeItemCollapsibleState.Collapsed + : TreeItemCollapsibleState.None, + }; + } +} + +export class PipelineDatasetsTreeNode + implements BundleResourceExplorerTreeNode +{ + readonly type = "pipeline_datasets"; + + constructor( + private readonly pipelineKey: string, + private readonly pipelinesManager: BundlePipelinesManager, + private readonly runMonitor?: PipelineRunStatus, + public parent?: BundleResourceExplorerTreeNode + ) {} + + async getChildren(): Promise { + const schemas = this.pipelinesManager.getSchemas(this.pipelineKey); + + // Preload and merge schemas from the past runs if the current run is not a full graph update (or doesn't exist) + // Note that getChildren can be called frequently, but we only preload the schemas once, and later on just return cached results. + if (!isFullGraphUpdate(this.runMonitor?.data)) { + const preloadedData = await this.pipelinesManager.preloadDatasets( + this.pipelineKey.split(".")[1] + ); + const preloadedSchemas = preloadedData?.schemas ?? new Map(); + for (const [key, value] of preloadedSchemas) { + if (!schemas.has(key)) { + schemas.set(key, value); + } + } + } + + if (schemas.size === 0) { + const status = getSimplifiedPipelineUpdateState( + this.runMonitor?.data + ); + if (isInLoadingState(this.runMonitor, status)) { + return [new TreeItemTreeNode({label: "Loading..."}, this)]; + } else { + return [ + new TreeItemTreeNode( + {label: "Run or validate the pipeline to see datasets"}, + this + ), + ]; + } + } + + const children: BundleResourceExplorerTreeNode[] = []; + for (const [, datasetWithSchema] of schemas) { + children.push(new PipelineDatasetTreeNode(datasetWithSchema)); + } + return children; + } + + getTreeItem(): BundleResourceExplorerTreeItem { + const status = getSimplifiedPipelineUpdateState(this.runMonitor?.data); + return { + label: "Datasets", + iconPath: isInLoadingState(this.runMonitor, status) + ? new ThemeIcon("loading~spin") + : new ThemeIcon("database"), + contextValue: ContextUtils.getContextString({ + nodeType: this.type, + }), + collapsibleState: TreeItemCollapsibleState.Collapsed, + }; + } +} + +function toDescription(str: string) { + return str.toLowerCase().replace(/_/g, " "); +} diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunStatusTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunStatusTreeNode.ts index a5d7bec2e..f50dc068b 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunStatusTreeNode.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunStatusTreeNode.ts @@ -3,43 +3,17 @@ import { BundleResourceExplorerTreeNode, } from "./types"; import {ThemeIcon, TreeItemCollapsibleState} from "vscode"; -import {ContextUtils, RunStateUtils} from "./utils"; -import {SimplifiedRunState} from "./utils/RunStateUtils"; -import {GetUpdateResponse} from "@databricks/databricks-sdk/dist/apis/pipelines"; +import {ContextUtils} from "./utils"; import {PipelineRunStatus} from "../../bundle/run/PipelineRunStatus"; import {TreeItemTreeNode} from "../TreeItemTreeNode"; import {ConnectionManager} from "../../configuration/ConnectionManager"; import {PipelineEventTreeNode} from "./PipelineEventTreeNode"; - -function getSimplifiedUpdateState( - update?: GetUpdateResponse["update"] -): SimplifiedRunState { - if (update?.state === undefined) { - return "Unknown"; - } - - switch (update.state) { - case "RESETTING": - case "CREATED": - case "QUEUED": - case "INITIALIZING": - case "SETTING_UP_TABLES": - case "WAITING_FOR_RESOURCES": - return "Pending"; - case "RUNNING": - return "Running"; - case "COMPLETED": - return "Success"; - case "FAILED": - return "Failed"; - case "CANCELED": - return "Cancelled"; - case "STOPPING": - return "Terminating"; - default: - return "Unknown"; - } -} +import { + getSimplifiedPipelineUpdateState, + getThemeIconForStatus, + getTreeItemFromRunMonitorStatus, + humaniseDate, +} from "./utils/RunStateUtils"; export class PipelineRunStatusTreeNode implements BundleResourceExplorerTreeNode @@ -82,9 +56,7 @@ export class PipelineRunStatusTreeNode { label: "Start Time", iconPath: new ThemeIcon("watch"), - description: RunStateUtils.humaniseDate( - this.update.creation_time - ), + description: humaniseDate(this.update.creation_time), contextValue: "start_time", }, this @@ -100,12 +72,11 @@ export class PipelineRunStatusTreeNode } getTreeItem(): BundleResourceExplorerTreeItem { - const runMonitorRunStateTreeItem = - RunStateUtils.getTreeItemFromRunMonitorStatus( - this.type, - this.url, - this.runMonitor - ); + const runMonitorRunStateTreeItem = getTreeItemFromRunMonitorStatus( + this.type, + this.url, + this.runMonitor + ); if (runMonitorRunStateTreeItem) { return runMonitorRunStateTreeItem; @@ -114,11 +85,11 @@ export class PipelineRunStatusTreeNode const status = this.runMonitor.runState === "cancelling" ? "Cancelling" - : getSimplifiedUpdateState(this.update); + : getSimplifiedPipelineUpdateState(this.update); return { label: "Run Status", - iconPath: RunStateUtils.getThemeIconForStatus(status), + iconPath: getThemeIconForStatus(status), description: status, contextValue: ContextUtils.getContextString({ nodeType: this.type, diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineTreeNode.ts index ba8931975..cebcdad2b 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineTreeNode.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineTreeNode.ts @@ -13,6 +13,8 @@ import {PipelineRunStatus} from "../../bundle/run/PipelineRunStatus"; import {TreeItemTreeNode} from "../TreeItemTreeNode"; import {PipelineRunStatusTreeNode} from "./PipelineRunStatusTreeNode"; import {ThemeIcon} from "vscode"; +import {PipelineDatasetsTreeNode} from "./PipelineDatasetsTreeNode"; +import {BundlePipelinesManager} from "../../bundle/BundlePipelinesManager"; export class PipelineTreeNode implements BundleResourceExplorerTreeNode { readonly type = "pipelines"; @@ -27,8 +29,9 @@ export class PipelineTreeNode implements BundleResourceExplorerTreeNode { } constructor( - private readonly bundleRunStatusManager: BundleRunStatusManager, private readonly connectionManager: ConnectionManager, + private readonly bundleRunStatusManager: BundleRunStatusManager, + private readonly pipelinesManager: BundlePipelinesManager, public readonly resourceKey: string, public readonly data: BundleResourceExplorerResource<"pipelines">, public parent?: BundleResourceExplorerTreeNode @@ -96,6 +99,15 @@ export class PipelineTreeNode implements BundleResourceExplorerTreeNode { ); } + children.push( + new PipelineDatasetsTreeNode( + this.resourceKey, + this.pipelinesManager, + runMonitor, + this + ) + ); + if (runMonitor) { children.push( new PipelineRunStatusTreeNode( @@ -110,8 +122,9 @@ export class PipelineTreeNode implements BundleResourceExplorerTreeNode { } static getRoots( - bundleRunStatusManager: BundleRunStatusManager, connectionManager: ConnectionManager, + bundleRunStatusManager: BundleRunStatusManager, + pipelinesManager: BundlePipelinesManager, remoteStateConfig: BundleRemoteState ): BundleResourceExplorerTreeNode[] { const pipelines = remoteStateConfig?.resources?.pipelines; @@ -121,8 +134,9 @@ export class PipelineTreeNode implements BundleResourceExplorerTreeNode { return Object.keys(pipelines).map((pipelineKey) => { return new PipelineTreeNode( - bundleRunStatusManager, connectionManager, + bundleRunStatusManager, + pipelinesManager, `pipelines.${pipelineKey}`, pipelines[pipelineKey], undefined diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/ResourceTypeHeaderTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/ResourceTypeHeaderTreeNode.ts index 234e59300..76907c00d 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/ResourceTypeHeaderTreeNode.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/ResourceTypeHeaderTreeNode.ts @@ -9,6 +9,7 @@ import { } from "./types"; import {ExtensionContext, TreeItemCollapsibleState} from "vscode"; import {PipelineTreeNode} from "./PipelineTreeNode"; +import {BundlePipelinesManager} from "../../bundle/BundlePipelinesManager"; function humaniseResourceType(type: BundleResourceExplorerTreeNode["type"]) { switch (type) { @@ -69,8 +70,9 @@ export class ResourceTypeHeaderTreeNode static getRoots( context: ExtensionContext, - bundleRunStatusManager: BundleRunStatusManager, connectionManager: ConnectionManager, + bundleRunStatusManager: BundleRunStatusManager, + pipelinesManager: BundlePipelinesManager, bundleRemoteState: BundleRemoteState ) { const roots: BundleResourceExplorerTreeNode[] = []; @@ -86,8 +88,9 @@ export class ResourceTypeHeaderTreeNode } const pipelines = PipelineTreeNode.getRoots( - bundleRunStatusManager, connectionManager, + bundleRunStatusManager, + pipelinesManager, bundleRemoteState ); if (pipelines.length > 0) { diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/TaskRunStatusTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/TaskRunStatusTreeNode.ts index f5cb10f79..979b3f265 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/TaskRunStatusTreeNode.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/TaskRunStatusTreeNode.ts @@ -3,7 +3,7 @@ import { BundleResourceExplorerTreeNode, } from "./types"; import {TreeItemCollapsibleState} from "vscode"; -import {ContextUtils, JobRunStateUtils, RunStateUtils} from "./utils"; +import {ContextUtils, RunStateUtils} from "./utils"; import {ConnectionManager} from "../../configuration/ConnectionManager"; import {jobs} from "@databricks/databricks-sdk"; import {TreeItemTreeNode} from "../TreeItemTreeNode"; @@ -78,7 +78,7 @@ export class TaskRunStatusTreeNode implements BundleResourceExplorerTreeNode { } getTreeItem(): BundleResourceExplorerTreeItem { - const status = JobRunStateUtils.getSimplifiedRunState(this.runDetails); + const status = RunStateUtils.getSimplifiedJobRunState(this.runDetails); const icon = RunStateUtils.getThemeIconForStatus(status); return { label: "Run Status", diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/types.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/types.ts index 2efe6a979..9b49b1ad6 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/types.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/types.ts @@ -17,11 +17,15 @@ export interface BundleResourceExplorerTreeNode { | "pipeline_run_status" | "pipeline_run_events" | "pipeline_run_event" + | "pipeline_datasets" + | "pipeline_dataset" | "resource_type_header" | "task" | "job_run_status" | "task_header"; parent?: BundleResourceExplorerTreeNode; getTreeItem(): BundleResourceExplorerTreeItem; - getChildren(): BundleResourceExplorerTreeNode[]; + getChildren(): + | BundleResourceExplorerTreeNode[] + | Promise; } diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/JobRunStateUtils.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/JobRunStateUtils.ts deleted file mode 100644 index bcbe3b4d8..000000000 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/JobRunStateUtils.ts +++ /dev/null @@ -1,48 +0,0 @@ -import {Run} from "@databricks/databricks-sdk/dist/apis/jobs"; -import {SimplifiedRunState} from "./RunStateUtils"; - -export function getSimplifiedRunState(run?: Run): SimplifiedRunState { - if (run?.state?.life_cycle_state === undefined) { - return "Unknown"; - } - - switch (run.state.life_cycle_state) { - case "INTERNAL_ERROR": - return "Failed"; - case "SKIPPED": - return "Skipped"; - case "WAITING_FOR_RETRY": - case "BLOCKED": - case "PENDING": - return "Pending"; - case "RUNNING": - if (run.state.user_cancelled_or_timedout) { - return "Terminating"; - } - return "Running"; - case "TERMINATING": - return "Terminating"; - case "TERMINATED": - if (run.state.user_cancelled_or_timedout) { - return "Cancelled"; - } - switch (run.state.result_state) { - case "SUCCESS": - case "SUCCESS_WITH_FAILURES": - return "Success"; - case "MAXIMUM_CONCURRENT_RUNS_REACHED": - case "FAILED": - case "TIMEDOUT": - return "Failed"; - case "UPSTREAM_CANCELED": - case "UPSTREAM_FAILED": - case "EXCLUDED": - return "Skipped"; - case "CANCELED": - return "Cancelled"; - } - return "Terminated"; - } - - return "Unknown"; -} diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/RunStateUtils.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/RunStateUtils.ts index f438db54b..3e5e8ca9d 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/RunStateUtils.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/RunStateUtils.ts @@ -6,6 +6,8 @@ import { BundleResourceExplorerTreeItem, BundleResourceExplorerTreeNode, } from "../types"; +import {GetUpdateResponse} from "@databricks/databricks-sdk/dist/apis/pipelines"; +import {Run} from "@databricks/databricks-sdk/dist/apis/jobs"; export type SimplifiedRunState = | "Terminated" @@ -56,7 +58,7 @@ export function getThemeIconForStatus(status: SimplifiedRunState): ThemeIcon { return new ThemeIcon("testing-skipped-icon"); case "Pending": case "Running": - return new ThemeIcon("sync~spin", new ThemeColor("charts.green")); + return new ThemeIcon("loading~spin"); case "Cancelling": case "Terminating": return new ThemeIcon("sync-ignored", new ThemeColor("charts.red")); @@ -75,6 +77,100 @@ export function getThemeIconForStatus(status: SimplifiedRunState): ThemeIcon { } } +export function getSimplifiedPipelineUpdateState( + update?: GetUpdateResponse["update"] +): SimplifiedRunState { + if (update?.state === undefined) { + return "Unknown"; + } + switch (update.state) { + case "RESETTING": + case "CREATED": + case "QUEUED": + case "INITIALIZING": + case "SETTING_UP_TABLES": + case "WAITING_FOR_RESOURCES": + return "Pending"; + case "RUNNING": + return "Running"; + case "COMPLETED": + return "Success"; + case "FAILED": + return "Failed"; + case "CANCELED": + return "Cancelled"; + case "STOPPING": + return "Terminating"; + default: + return "Unknown"; + } +} + +export function getSimplifiedJobRunState(run?: Run): SimplifiedRunState { + if (run?.state?.life_cycle_state === undefined) { + return "Unknown"; + } + + switch (run.state.life_cycle_state) { + case "INTERNAL_ERROR": + return "Failed"; + case "SKIPPED": + return "Skipped"; + case "WAITING_FOR_RETRY": + case "BLOCKED": + case "PENDING": + return "Pending"; + case "RUNNING": + if (run.state.user_cancelled_or_timedout) { + return "Terminating"; + } + return "Running"; + case "TERMINATING": + return "Terminating"; + case "TERMINATED": + if (run.state.user_cancelled_or_timedout) { + return "Cancelled"; + } + switch (run.state.result_state) { + case "SUCCESS": + case "SUCCESS_WITH_FAILURES": + return "Success"; + case "MAXIMUM_CONCURRENT_RUNS_REACHED": + case "FAILED": + case "TIMEDOUT": + return "Failed"; + case "UPSTREAM_CANCELED": + case "UPSTREAM_FAILED": + case "EXCLUDED": + return "Skipped"; + case "CANCELED": + return "Cancelled"; + } + return "Terminated"; + } + + return "Unknown"; +} + +export function isInLoadingState( + runMonitor?: BundleRunStatus, + runState?: SimplifiedRunState +): boolean { + return ( + isInFirstLoadState(runMonitor) || + runState === "Running" || + runState === "Pending" + ); +} + +export function isInFirstLoadState(runMonitor?: BundleRunStatus): boolean { + return ( + (runMonitor?.runState === "running" || + runMonitor?.runState === "unknown") && + !runMonitor.data + ); +} + export function getTreeItemFromRunMonitorStatus( type: BundleResourceExplorerTreeNode["type"], url?: string, @@ -115,11 +211,7 @@ export function getTreeItemFromRunMonitorStatus( }; } - if ( - (runMonitor?.runState === "running" || - runMonitor?.runState === "unknown") && - !runMonitor.data - ) { + if (isInFirstLoadState(runMonitor)) { return { label: "Run Status", iconPath: new ThemeIcon("loading~spin"), diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/index.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/index.ts index a8d029386..24cf4760b 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/index.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/index.ts @@ -1,3 +1,2 @@ -export * as JobRunStateUtils from "./JobRunStateUtils"; export * as RunStateUtils from "./RunStateUtils"; export * as ContextUtils from "./ContextUtils";