Skip to content

Commit

Permalink
Add Datasets section with dataset schemas to the pipelines explorer (
Browse files Browse the repository at this point in the history
…#1479)

## Changes
Initially Datasets aren't loaded, but you can expand them to trigger
loading
<img width="507" alt="Screenshot 2024-12-05 at 10 14 41"
src="https://github.com/user-attachments/assets/59d39eba-ded7-409a-b6df-54569622a1cb">

If we can't find dataset definitions in the latests runs we show "please
run or validate" item:
<img width="507" alt="Screenshot 2024-12-05 at 10 14 58"
src="https://github.com/user-attachments/assets/889a3285-0d5c-4ea8-b70a-a9476fc9449e">

<img width="507" alt="Screenshot 2024-12-05 at 10 15 14"
src="https://github.com/user-attachments/assets/4afcb134-8fa0-495c-956b-873a60c80215">

Running or validating the pipeline will also show 'loading' state for
the datasets section:

<img width="507" alt="Screenshot 2024-12-05 at 10 28 09"
src="https://github.com/user-attachments/assets/3618ccc8-4c70-4841-a6fc-749c2f95497e">






## Tests
Unit and e2e tests
  • Loading branch information
ilia-db authored Dec 12, 2024
1 parent 27f8a9b commit e904f71
Show file tree
Hide file tree
Showing 17 changed files with 581 additions and 186 deletions.
127 changes: 114 additions & 13 deletions packages/databricks-vscode/src/bundle/BundlePipelinesManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,21 @@ describe("BundlePipelinesManager", () => {
const firstRun = {
data: {creation_time: 10},
events: [
{origin: {dataset_name: "table1"}},
{origin: {not_a_dataset_name: "table1.5"}},
{origin: {dataset_name: "table2"}},
{
origin: {dataset_name: "table1"},
details: {dataset_definition: {dataset_type: "TABLE"}},
},
{
origin: {not_a_dataset_name: "table1.5"},
},
{
origin: {dataset_name: "table2"},
details: {dataset_definition: {dataset_type: "TABLE"}},
},
{
origin: {dataset_name: "table2.5"},
details: {dataset_definition: {dataset_type: "VIEW"}},
},
],
};
/* eslint-enable @typescript-eslint/naming-convention */
Expand All @@ -77,9 +89,17 @@ describe("BundlePipelinesManager", () => {
refresh_selection: ["table3", "table4"],
},
events: [
{origin: {dataset_name: "table3"}},
{origin: {not_a_dataset_name: "table3.5"}},
{origin: {dataset_name: "table4"}},
{
origin: {dataset_name: "table3"},
details: {dataset_definition: {dataset_type: "TABLE"}},
},
{
origin: {not_a_dataset_name: "table3.5"},
},
{
origin: {dataset_name: "table4"},
details: {dataset_definition: {dataset_type: "TABLE"}},
},
],
};
/* eslint-enable @typescript-eslint/naming-convention */
Expand All @@ -103,9 +123,17 @@ describe("BundlePipelinesManager", () => {
state: "RUNNING",
},
events: [
{origin: {dataset_name: "table_new"}},
{origin: {not_a_dataset_name: "not a table"}},
{origin: {dataset_name: "table_final"}},
{
origin: {dataset_name: "table_new"},
details: {dataset_definition: {dataset_type: "TABLE"}},
},
{
origin: {not_a_dataset_name: "not a table"},
},
{
origin: {dataset_name: "table_final"},
details: {dataset_definition: {dataset_type: "TABLE"}},
},
],
};
/* eslint-enable @typescript-eslint/naming-convention */
Expand All @@ -126,9 +154,17 @@ describe("BundlePipelinesManager", () => {
state: "COMPLETED",
},
events: [
{origin: {dataset_name: "table_new"}},
{origin: {not_a_dataset_name: "not a table"}},
{origin: {dataset_name: "table_final"}},
{
origin: {dataset_name: "table_new"},
details: {dataset_definition: {dataset_type: "TABLE"}},
},
{
origin: {not_a_dataset_name: "not a table"},
},
{
origin: {dataset_name: "table_final"},
details: {dataset_definition: {dataset_type: "TABLE"}},
},
],
};
/* eslint-enable @typescript-eslint/naming-convention */
Expand All @@ -137,12 +173,77 @@ describe("BundlePipelinesManager", () => {
await clock.runToLastAsync();

// Only the datasets from the final full-refresh run should be left
datasets = manager.getDatasets("pipeline1");
datasets = manager.getDatasets("pipelines.pipeline1");
assert.strictEqual(datasets.size, 2);
assert(datasets.has("table_new"));
assert(datasets.has("table_final"));
});

it("should extract pipeline schemas from run events", async () => {
const remoteState = {resources: {pipelines: {pipeline1: {}}}};
when(configModel.get("remoteStateConfig")).thenResolve(remoteState);
const runStatuses = new Map();
when(runStatusManager.runStatuses).thenReturn(runStatuses);

/* eslint-disable @typescript-eslint/naming-convention */
const firstRun = {
data: {creation_time: 10},
events: [
{
origin: {dataset_name: "table1"},
details: {
dataset_definition: {
dataset_type: "TABLE",
schema: [
{name: "col1", data_type: "STRING"},
{name: "col2", not_a_data_type: "INTEGER"},
],
},
},
},
{
origin: {not_a_dataset_name: "table1.5"},
},
{
origin: {dataset_name: "table2"},
details: {
dataset_definition: {dataset_type: "TABLE", schema: []},
},
},
{
origin: {dataset_name: "table3"},
details: {
dataset_definition: {
dataset_type: "VIEW",
schema: [{name: "col1", data_type: "STRING"}],
},
},
},
],
};
/* eslint-enable @typescript-eslint/naming-convention */
runStatuses.set("pipelines.pipeline1", firstRun);

eventEmitter.fire();
await clock.runToLastAsync();

const schemas = manager.getSchemas("pipelines.pipeline1");
assert.strictEqual(schemas.size, 2);
assert.deepStrictEqual(schemas.get("table1"), {
name: "table1",
type: "TABLE",
schema: [
{name: "col1", type: "STRING"},
{name: "col2", type: ""},
],
});
assert.deepStrictEqual(schemas.get("table3"), {
name: "table3",
type: "VIEW",
schema: [{name: "col1", type: "STRING"}],
});
});

describe("locationToRange", () => {
it("should return correct range for a given location in a text file", async () => {
const uri = Uri.file("/path/to/file.py");
Expand Down
101 changes: 68 additions & 33 deletions packages/databricks-vscode/src/bundle/BundlePipelinesManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,22 @@ type RunState = {
events: PipelineEvent[] | undefined;
};

type PipelineState = {
key: string;
export type DatasetWithSchema = {
name: string;
type: string;
schema: Array<{name: string; type: string}>;
};

type ResolvedPipelineState = {
datasets: Set<string>;
runs: Set<RunState>;
schemas: Map<string, DatasetWithSchema>;
};
type PreloadedPipelineState = Promise<ResolvedPipelineState | undefined>;

type PreloadedPipelineState = Promise<Set<string> | undefined>;
type PipelineState = {
key: string;
runs: Set<RunState>;
} & ResolvedPipelineState;

type Pick = QuickPickItem & {isDataset?: boolean};

Expand Down Expand Up @@ -78,6 +87,7 @@ export class BundlePipelinesManager {
this.configModel.onDidChangeTarget(() => {
this.updateTriggeredPipelinesState();
this.updateDiagnostics();
this.preloadedState.clear();
}),
this.configModel.onDidChangeKey("remoteStateConfig")(async () => {
this.updateTriggeredPipelinesState();
Expand All @@ -104,6 +114,7 @@ export class BundlePipelinesManager {
this.triggeredState.set(pipelineKey, {
key: pipelineKey,
datasets: new Set(),
schemas: new Map(),
runs: new Set(),
});
}
Expand All @@ -113,7 +124,9 @@ export class BundlePipelinesManager {
);
if (runStatus) {
state.runs.add(runStatus as PipelineRunStatus);
state.datasets = extractPipelineDatasets(state.runs);
const extractedData = extractPipelineDatasets(state.runs);
state.datasets = extractedData.datasets;
state.schemas = extractedData.schemas;
}
});
}
Expand Down Expand Up @@ -242,10 +255,19 @@ export class BundlePipelinesManager {
}

public getDatasets(pipelineKey: string) {
return this.triggeredState.get(pipelineKey)?.datasets ?? new Set();
const key = pipelineKey.split(".")[1] ?? pipelineKey;
return this.triggeredState.get(key)?.datasets ?? new Set();
}

public getSchemas(pipelineKey: string) {
const key = pipelineKey.split(".")[1] ?? pipelineKey;
return (
this.triggeredState.get(key ?? pipelineKey)?.schemas ??
new Map<string, DatasetWithSchema>()
);
}

async preloadDatasets(pipelineKey: string): PreloadedPipelineState {
public async preloadDatasets(pipelineKey: string): PreloadedPipelineState {
const remoteState = await this.configModel.get("remoteStateConfig");
if (!remoteState) {
return undefined;
Expand All @@ -267,13 +289,13 @@ export class BundlePipelinesManager {
return preloaded;
}

const barrier = new Barrier<Set<string>>();
const barrier = new Barrier<ResolvedPipelineState>();
this.preloadedState.set(pipelineKey, barrier.promise);

try {
const runs = await this.preloadUpdates(client, pipelineId);
if (!runs) {
barrier.resolve(new Set());
barrier.resolve({datasets: new Set(), schemas: new Map()});
return barrier.promise;
}
const listing = this.createPreloadEventsRequest(
Expand All @@ -287,8 +309,10 @@ export class BundlePipelinesManager {
runState.events.push(event);
}
}
const datasets = extractPipelineDatasets(new Set(runs.values()));
barrier.resolve(datasets);
const extractedData = extractPipelineDatasets(
new Set(runs.values())
);
barrier.resolve(extractedData);
} catch (e) {
barrier.reject(e);
}
Expand Down Expand Up @@ -360,9 +384,9 @@ export class BundlePipelinesManager {
disposables
);
this.preloadDatasets(key)
.then((preloadedDatasets) => {
if (preloadedDatasets && isUIVisible) {
for (const dataset of preloadedDatasets) {
.then((preloadedData) => {
if (preloadedData && isUIVisible) {
for (const dataset of preloadedData.datasets) {
knownDatasets.add(dataset);
}
updateItems(ui, knownDatasets);
Expand Down Expand Up @@ -418,7 +442,7 @@ async function confirmFullRefresh() {
);
}

function isFullGraphUpdate(update?: UpdateInfo) {
export function isFullGraphUpdate(update?: UpdateInfo) {
if (!update || update.state !== "COMPLETED") {
return false;
}
Expand All @@ -429,37 +453,48 @@ function isFullGraphUpdate(update?: UpdateInfo) {
);
}

// "details" is not a publicly documented field
function extractDatasetName(
event: PipelineEvent & {details?: any}
): string | undefined {
if (!event.origin?.dataset_name) {
return;
}
// VIEWs can't be used for a partial refresh (they are always refreshed)
if (event.details?.dataset_definition?.dataset_type === "VIEW") {
return;
}
return event.origin.dataset_name;
}

function extractPipelineDatasets(runs: Set<RunState>) {
function extractPipelineDatasets(runs: Set<RunState>): ResolvedPipelineState {
const datasets = new Set<string>();
const schemas = new Map<string, DatasetWithSchema>();
const runsByStartTimeDesc = Array.from(runs).sort(
(a, b) => (b.data?.creation_time ?? 0) - (a.data?.creation_time ?? 0)
);
for (const run of runsByStartTimeDesc) {
for (const event of run.events ?? []) {
const datasetName = extractDatasetName(event);
if (datasetName) {
const datasetName = event.origin?.dataset_name;
// 'details' is not documented, but it's safe to rely on if it exists
// @ts-expect-error Property 'details' does not exist
const definition = event.details?.dataset_definition;
if (!datasetName || !definition) {
continue;
}
const datasetType = definition.dataset_type ?? "";
if (datasetType && datasetType !== "VIEW") {
datasets.add(datasetName);
}
if (
Array.isArray(definition.schema) &&
definition.schema.length > 0
) {
const schema = definition.schema.map(
// eslint-disable-next-line @typescript-eslint/naming-convention
(field: {name?: string; data_type?: string}) => ({
name: field.name ?? "",
type: field.data_type ?? "",
})
);
schemas.set(datasetName, {
name: datasetName,
type: datasetType,
schema,
});
}
}
if (isFullGraphUpdate(run.data)) {
break;
}
}
return datasets;
return {datasets, schemas};
}

function createPicks(datasets: Set<string>, manualValue?: string) {
Expand Down
Loading

0 comments on commit e904f71

Please sign in to comment.