Skip to content

Commit

Permalink
Add partial update to pipelines (#1453)
Browse files Browse the repository at this point in the history
## Changes
I've changed the "normal" pipeline run action to use `run-all` icon, and
the partial update run uses the `run` instead.

Currently we don't pre-load pipeline updates, so if a user hasn't
executed a pipeline from the extension, the partial update action will
not show a list of tables to select from (and users will have to type
the table names manually). After the pipeline is executed we show the
tables selection in the partial update action.

Pre-loading pipeline updates is for later.

New action:
<img width="676" alt="Screenshot 2024-11-18 at 15 31 07"
src="https://github.com/user-attachments/assets/31022043-9820-497a-a5bf-8d2b71fed24a">


The case when we don't know about past events:
<img width="896" alt="Screenshot 2024-11-15 at 11 24 52"
src="https://github.com/user-attachments/assets/6bdf0f49-399b-4657-a2d1-8207d4e8f78c">
<img width="896" alt="Screenshot 2024-11-15 at 11 25 01"
src="https://github.com/user-attachments/assets/3044ddf2-1059-4775-aa00-9a28165c1e77">

The case when we offer a selection of table names based on past runs:
<img width="896" alt="Screenshot 2024-11-15 at 11 26 28"
src="https://github.com/user-attachments/assets/e4f834c3-918a-4f0a-92ab-182305fe450c">

<img width="896" alt="Screenshot 2024-11-15 at 11 25 13"
src="https://github.com/user-attachments/assets/48c5a21f-7423-4585-99f9-62a682101ece">



## Tests
Partially covered by unit tests
  • Loading branch information
ilia-db authored Nov 19, 2024
1 parent d410b62 commit b7cbe4c
Show file tree
Hide file tree
Showing 9 changed files with 439 additions and 24 deletions.
50 changes: 41 additions & 9 deletions packages/databricks-vscode/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,28 @@
"category": "Databricks"
},
{
"command": "databricks.bundle.deployAndRun",
"icon": "$(debug-start)",
"title": "Deploy the bundle and run the resource",
"command": "databricks.bundle.deployAndRunJob",
"icon": "$(run)",
"title": "Deploy the bundle and run the workflow",
"enablement": "databricks.context.activated && databricks.context.bundle.isTargetSet && databricks.context.bundle.deploymentState == idle",
"category": "Databricks"
},
{
"command": "databricks.bundle.deployAndRunPipeline",
"icon": "$(run-all)",
"title": "Deploy the bundle and run the pipeline",
"enablement": "databricks.context.activated && databricks.context.bundle.isTargetSet && databricks.context.bundle.deploymentState == idle",
"category": "Databricks"
},
{
"command": "databricks.bundle.deployAndRunSelectedTables",
"icon": "$(run)",
"title": "Deploy the bundle and select pipeline tables for partial update",
"enablement": "databricks.context.activated && databricks.context.bundle.isTargetSet && databricks.context.bundle.deploymentState == idle",
"category": "Databricks"
},
{
"command": "databricks.bundle.deployAndValidate",
"command": "databricks.bundle.deployAndValidatePipeline",
"icon": {
"dark": "resources/dark/check-line-icon.svg",
"light": "resources/light/check-line-icon.svg"
Expand Down Expand Up @@ -550,12 +564,22 @@
"group": "inline@0"
},
{
"command": "databricks.bundle.deployAndRun",
"when": "view == dabsResourceExplorerView && viewItem =~ /^databricks.bundle.*.runnable.*$/ && databricks.context.bundle.deploymentState == idle",
"command": "databricks.bundle.deployAndRunJob",
"when": "view == dabsResourceExplorerView && viewItem =~ /^databricks.bundle.resource=jobs.runnable.*$/ && databricks.context.bundle.deploymentState == idle",
"group": "inline@0"
},
{
"command": "databricks.bundle.deployAndValidate",
"command": "databricks.bundle.deployAndRunPipeline",
"when": "view == dabsResourceExplorerView && viewItem =~ /^databricks.bundle.resource=pipelines.runnable.*$/ && databricks.context.bundle.deploymentState == idle",
"group": "inline@0"
},
{
"command": "databricks.bundle.deployAndValidatePipeline",
"when": "view == dabsResourceExplorerView && viewItem =~ /^databricks.bundle.resource=pipelines.runnable.*$/ && databricks.context.bundle.deploymentState == idle",
"group": "inline@0"
},
{
"command": "databricks.bundle.deployAndRunSelectedTables",
"when": "view == dabsResourceExplorerView && viewItem =~ /^databricks.bundle.resource=pipelines.runnable.*$/ && databricks.context.bundle.deploymentState == idle",
"group": "inline@0"
},
Expand Down Expand Up @@ -657,11 +681,19 @@
"when": "false"
},
{
"command": "databricks.bundle.deployAndRun",
"command": "databricks.bundle.deployAndRunJob",
"when": "false"
},
{
"command": "databricks.bundle.deployAndRunPipeline",
"when": "false"
},
{
"command": "databricks.bundle.deployAndValidatePipeline",
"when": "false"
},
{
"command": "databricks.bundle.deployAndValidate",
"command": "databricks.bundle.deployAndRunSelectedTables",
"when": "false"
},
{
Expand Down
119 changes: 119 additions & 0 deletions packages/databricks-vscode/src/bundle/BundlePipelinesManager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import {BundlePipelinesManager} from "./BundlePipelinesManager";
import {BundleRunStatusManager} from "./run/BundleRunStatusManager";
import {ConfigModel} from "../configuration/models/ConfigModel";
import {mock, instance, when} from "ts-mockito";
import assert from "assert";
import {EventEmitter} from "vscode";
import {install, InstalledClock} from "@sinonjs/fake-timers";

describe(__filename, () => {
let runStatusManager: BundleRunStatusManager;
let configModel: ConfigModel;
let manager: BundlePipelinesManager;
let eventEmitter: EventEmitter<void>;
let clock: InstalledClock;

beforeEach(() => {
clock = install();
eventEmitter = new EventEmitter();
runStatusManager = mock<BundleRunStatusManager>();
configModel = mock<ConfigModel>();
when(runStatusManager.onDidChange).thenReturn(eventEmitter.event);
when(configModel.onDidChangeKey("remoteStateConfig")).thenReturn(
new EventEmitter<void>().event
);
when(configModel.onDidChangeTarget).thenReturn(
new EventEmitter<void>().event
);
manager = new BundlePipelinesManager(
instance(runStatusManager),
instance(configModel)
);
});

afterEach(() => {
clock.uninstall();
});

it("should update pipeline datasets from run events", async () => {
let datasets;
const remoteState = {resources: {pipelines: {pipeline1: {}}}};
when(configModel.get("remoteStateConfig")).thenResolve(remoteState);
const runStatuses = new Map();
when(runStatusManager.runStatuses).thenReturn(runStatuses);

/* eslint-disable @typescript-eslint/naming-convention */
const firstRun = {
data: {
update: {creation_time: 10},
},
events: [
{origin: {dataset_name: "table1"}},
{origin: {not_a_dataset_name: "table1.5"}},
{origin: {dataset_name: "table2"}},
],
};
/* eslint-enable @typescript-eslint/naming-convention */
runStatuses.set("pipelines.pipeline1", firstRun);

eventEmitter.fire();
await clock.runToLastAsync();

datasets = manager.getDatasets("pipeline1");
assert.strictEqual(datasets.size, 2);
assert(datasets.has("table1"));
assert(datasets.has("table2"));

/* eslint-disable @typescript-eslint/naming-convention */
const secondPartialRun = {
data: {
update: {
creation_time: 100,
refresh_selection: ["table3", "table4"],
},
},
events: [
{origin: {dataset_name: "table3"}},
{origin: {not_a_dataset_name: "table3.5"}},
{origin: {dataset_name: "table4"}},
],
};
/* eslint-enable @typescript-eslint/naming-convention */

runStatuses.set("pipelines.pipeline1", secondPartialRun);
eventEmitter.fire();
await clock.runToLastAsync();

datasets = manager.getDatasets("pipeline1");
assert.strictEqual(datasets.size, 4);
assert(datasets.has("table1"));
assert(datasets.has("table2"));
assert(datasets.has("table3"));
assert(datasets.has("table4"));

/* eslint-disable @typescript-eslint/naming-convention */
const finalFullRefreshRun = {
data: {
update: {
creation_time: 200,
refresh_selection: [],
},
},
events: [
{origin: {dataset_name: "table_new"}},
{origin: {not_a_dataset_name: "not a table"}},
{origin: {dataset_name: "table_final"}},
],
};
/* eslint-enable @typescript-eslint/naming-convention */
runStatuses.set("pipelines.pipeline1", finalFullRefreshRun);
eventEmitter.fire();
await clock.runToLastAsync();

// Only the datasets from the final full-refresh run should be left
datasets = manager.getDatasets("pipeline1");
assert.strictEqual(datasets.size, 2);
assert(datasets.has("table_new"));
assert(datasets.has("table_final"));
});
});
Loading

0 comments on commit b7cbe4c

Please sign in to comment.