Skip to content

Commit

Permalink
Add Event Log section to the pipeline subtree (#1444)
Browse files Browse the repository at this point in the history
## Changes

Running state:
<img width="533" alt="Screenshot 2024-11-08 at 15 15 23"
src="https://github.com/user-attachments/assets/f518a051-6d1d-4ffb-bf4b-33af7482d1c8">

Completed state:
<img width="515" alt="Screenshot 2024-11-08 at 15 15 50"
src="https://github.com/user-attachments/assets/9f79d65d-f4de-402c-98b8-5ac447150d45">

Currently event log items aren't interactive. 

Requires this JS SDK PR to be merged first:
databricks/databricks-sdk-js#272

## Tests
Manually
  • Loading branch information
ilia-db authored Nov 13, 2024
1 parent aa8eae8 commit 3c3fc9c
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 35 deletions.
48 changes: 38 additions & 10 deletions packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import {BundleRunStatus} from "./BundleRunStatus";
import {AuthProvider} from "../../configuration/auth/AuthProvider";
import {onError} from "../../utils/onErrorDecorator";
import {pipelines} from "@databricks/databricks-sdk";
import {pipelines, WorkspaceClient} from "@databricks/databricks-sdk";

function isRunning(status?: pipelines.UpdateInfoState) {
if (status === undefined) {
Expand All @@ -12,17 +12,19 @@ function isRunning(status?: pipelines.UpdateInfoState) {
}

export class PipelineRunStatus extends BundleRunStatus {
readonly type = "pipelines";
public readonly type = "pipelines";
public data: pipelines.GetUpdateResponse | undefined;
public events: pipelines.PipelineEvent[] | undefined;

private interval?: NodeJS.Timeout;
data: pipelines.GetUpdateResponse | undefined;

constructor(
private readonly authProvider: AuthProvider,
private readonly pipelineId: string
) {
super();
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
parseId(output: string): void {
if (this.runId !== undefined || this.runState !== "unknown") {
return;
Expand Down Expand Up @@ -57,26 +59,52 @@ export class PipelineRunStatus extends BundleRunStatus {
if (this.runId === undefined) {
throw new Error("No update id");
}
const update = await client.pipelines.getUpdate({
this.data = await client.pipelines.getUpdate({
pipeline_id: this.pipelineId,
update_id: this.runId,
});
this.data = update;

if (this.data.update?.creation_time !== undefined) {
this.events = await this.fetchUpdateEvents(
client,
this.data.update.creation_time,
this.data.update?.update_id
);
}

// If update is completed, we stop polling.
if (!isRunning(update.update?.state)) {
if (!isRunning(this.data.update?.state)) {
this.markCompleted();
return;
} else {
this.onDidChangeEmitter.fire();
}

this.onDidChangeEmitter.fire();
} catch (e) {
this.runState = "error";
throw e;
}
}, 5_000);
}

private async fetchUpdateEvents(
client: WorkspaceClient,
creationTime: number,
updateId?: string
) {
const events = [];
const timestamp = new Date(creationTime).toISOString();
const listEvents = client.pipelines.listPipelineEvents({
pipeline_id: this.pipelineId,
order_by: ["timestamp asc"],
filter: `timestamp >= '${timestamp}'`,
});
for await (const event of listEvents) {
if (!updateId || event.origin?.update_id === updateId) {
events.push(event);
}
}
return events;
}

private markCompleted() {
if (this.interval !== undefined) {
clearInterval(this.interval);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import {
BundleResourceExplorerTreeItem,
BundleResourceExplorerTreeNode,
} from "./types";
import {ThemeColor, ThemeIcon, TreeItemCollapsibleState} from "vscode";
import {ContextUtils} from "./utils";
import {PipelineRunStatus} from "../../bundle/run/PipelineRunStatus";
import {TreeItemTreeNode} from "../TreeItemTreeNode";
import {ConnectionManager} from "../../configuration/ConnectionManager";
import {EventLevel} from "@databricks/databricks-sdk/dist/apis/pipelines";

export class PipelineRunEventsTreeNode
implements BundleResourceExplorerTreeNode
{
readonly type = "pipeline_run_events";

private get update() {
return this.runMonitor?.data?.update;
}

private get events() {
return this.runMonitor?.events;
}

public get url() {
const {host} = this.connectionManager.databricksWorkspace ?? {};
// eslint-disable-next-line @typescript-eslint/naming-convention
const {pipeline_id, update_id} = this.update ?? {};
if (!host || !pipeline_id || !update_id) {
return undefined;
}
return `${host}#joblist/pipelines/${pipeline_id}/updates/${update_id}`;
}

constructor(
private readonly connectionManager: ConnectionManager,
private readonly runMonitor: PipelineRunStatus,
public parent?: BundleResourceExplorerTreeNode
) {}

getChildren(): BundleResourceExplorerTreeNode[] {
if (this.events === undefined || this.events.length === 0) {
return [];
}
const children: BundleResourceExplorerTreeNode[] = [];

for (const event of this.events) {
children.push(
new TreeItemTreeNode(
{
label: event.message ?? event.event_type ?? "unknown",
iconPath: getEventIcon(event.level),
tooltip: event.message,
contextValue: "pipeline_event",
},
this
)
);
}

return children;
}

getTreeItem(): BundleResourceExplorerTreeItem {
if (this.events === undefined || this.events.length === 0) {
return {
label: "Event Log",
iconPath: new ThemeIcon("loading~spin"),
contextValue: ContextUtils.getContextString({
nodeType: this.type,
}),
collapsibleState: TreeItemCollapsibleState.None,
};
}

return {
label: "Event Log",
iconPath: new ThemeIcon("inbox"),
contextValue: ContextUtils.getContextString({
nodeType: this.type,
hasUrl: this.url !== undefined,
}),
collapsibleState: TreeItemCollapsibleState.Expanded,
};
}
}

function getEventIcon(level: EventLevel | undefined): ThemeIcon {
switch (level) {
case "ERROR":
return new ThemeIcon(
"error",
new ThemeColor("list.errorForeground")
);
case "INFO":
return new ThemeIcon("info");
case "METRICS":
return new ThemeIcon("dashboard");
case "WARN":
return new ThemeIcon(
"warning",
new ThemeColor("list.warningForeground")
);
default:
return new ThemeIcon("question");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {
BundleResourceExplorerTreeItem,
BundleResourceExplorerTreeNode,
} from "./types";
import {TreeItemCollapsibleState} from "vscode";
import {ThemeIcon, TreeItemCollapsibleState} from "vscode";
import {ContextUtils, RunStateUtils} from "./utils";
import {SimplifiedRunState, sentenceCase} from "./utils/RunStateUtils";
import {GetUpdateResponse} from "@databricks/databricks-sdk/dist/apis/pipelines";
Expand Down Expand Up @@ -44,24 +44,19 @@ export class PipelineRunStatusTreeNode
implements BundleResourceExplorerTreeNode
{
readonly type = "pipeline_run_status";

private get update() {
return this.runMonitor?.data?.update;
}

public get url() {
if (this.type !== this.type) {
return undefined;
}
const host = this.connectionManager.databricksWorkspace?.host;
if (
host === undefined ||
this.update?.pipeline_id === undefined ||
this.update?.update_id === undefined
) {
const {host} = this.connectionManager.databricksWorkspace ?? {};
// eslint-disable-next-line @typescript-eslint/naming-convention
const {pipeline_id, update_id} = this.update ?? {};
if (!host || !pipeline_id || !update_id) {
return undefined;
}
return `${host.toString()}#joblist/pipelines/${
this.update.pipeline_id
}/updates/${this.update.update_id}`;
return `${host}#joblist/pipelines/${pipeline_id}/updates/${update_id}`;
}

constructor(
Expand Down Expand Up @@ -122,8 +117,7 @@ export class PipelineRunStatusTreeNode
if (this.update === undefined) {
return {
label: "Run Status",
iconPath: RunStateUtils.getThemeIconForStatus("Unknown"),
description: "Run status not available",
iconPath: new ThemeIcon("loading~spin"),
contextValue: ContextUtils.getContextString({
nodeType: this.type,
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {ConnectionManager} from "../../configuration/ConnectionManager";
import {PipelineRunStatus} from "../../bundle/run/PipelineRunStatus";
import {TreeItemTreeNode} from "../TreeItemTreeNode";
import {PipelineRunStatusTreeNode} from "./PipelineRunStatusTreeNode";
import {PipelineRunEventsTreeNode} from "./PipelineRunEventsTreeNode";
import {ThemeIcon} from "vscode";

export class PipelineTreeNode implements BundleResourceExplorerTreeNode {
readonly type = "pipelines";
Expand Down Expand Up @@ -67,22 +69,13 @@ export class PipelineTreeNode implements BundleResourceExplorerTreeNode {
const runMonitor = this.bundleRunStatusManager.runStatuses.get(
this.resourceKey
) as PipelineRunStatus | undefined;
if (runMonitor) {
children.push(
new PipelineRunStatusTreeNode(
this.connectionManager,
runMonitor,
this
)
);
}

if (this.data.catalog) {
children.push(
new TreeItemTreeNode(
{
label: "Catalog",
description: this.data.catalog,
iconPath: new ThemeIcon("book"),
contextValue: "catalog",
},
this
Expand All @@ -96,13 +89,29 @@ export class PipelineTreeNode implements BundleResourceExplorerTreeNode {
{
label: "Target",
description: this.data.target,
iconPath: new ThemeIcon("target"),
contextValue: "target",
},
this
)
);
}

if (runMonitor) {
children.push(
new PipelineRunStatusTreeNode(
this.connectionManager,
runMonitor,
this
),
new PipelineRunEventsTreeNode(
this.connectionManager,
runMonitor,
this
)
);
}

return children;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface BundleResourceExplorerTreeNode {
| "treeItem"
| "task_run_status"
| "pipeline_run_status"
| "pipeline_run_events"
| "resource_type_header"
| "task"
| "job_run_status"
Expand Down

0 comments on commit 3c3fc9c

Please sign in to comment.