Skip to content

Commit

Permalink
worker 1.82 (#818)
Browse files Browse the repository at this point in the history
* worker: allow steps to specify their own adaptor version

* worker: fix a couple of issues auto-loading the collections version

* changeset

* Readme

* version: [email protected]

* tests: fix integration test
  • Loading branch information
josephjclark authored Nov 8, 2024
1 parent e0e19b2 commit 21b66c4
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 11 deletions.
8 changes: 8 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/integration-tests-worker

## 1.0.65

### Patch Changes

- Updated dependencies [ef1fb63]
- Updated dependencies [606f23b]
- @openfn/ws-worker@1.9.0

## 1.0.64

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.64",
"version": "1.0.65",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
4 changes: 3 additions & 1 deletion integration-tests/worker/test/runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ test.before(async () => {
repoDir: path.resolve('tmp/repo/attempts'),
},
{
collectionsVersion: '1.0.0-next-f802225c',
collectionsVersion: '0.5.0',
collectionsUrl: 'http://localhost:4321/collections',
runPublicKey: keys.public,
}
));
Expand All @@ -45,6 +46,7 @@ const humanMb = (sizeInBytes: number) => Math.round(sizeInBytes / 1024 / 1024);
const run = async (t, attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.on('step:complete', ({ payload }) => {
console.log(payload);
t.is(payload.reason, 'success');

// TODO friendlier job names for this would be nice (rather than run ids)
Expand Down
7 changes: 7 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# ws-worker

## 1.8.2

### Patch Changes

- ef1fb63: Fix an issue running collections from an auto-loaded version
- 606f23b: Allow steps to specify their own adaptor version

## 1.8.1

### Patch Changes
Expand Down
14 changes: 14 additions & 0 deletions packages/ws-worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ To manually trigger a claim, post to `/claim`:
curl -X POST http://localhost:2222/claim
```

## Collections

To enable collections with a local lightning:

```
pnpm start -collections-url http://localhost:4000/collections
```

To use the monorepo adaptor version:

```
pnpm start --collections-version local --collections-url http://localhost:4000/collections
```

## Architecture

Lightning is expected to maintain a queue of runs. The Worker pulls those runs from the queue, via websocket, and sends them off to the Engine for execution.
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.8.1",
"version": "1.8.2",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
7 changes: 2 additions & 5 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ export interface ServerApp extends Koa {
engine: RuntimeEngine;
options: ServerOptions;
workloop?: Workloop;
// What version of the collections adaptor should we use?
// Can be set through CLI, or else it'll look up latest on startup
collectionsVersion?: string;

execute: ({ id, token }: ClaimRun) => Promise<void>;
destroy: () => void;
Expand Down Expand Up @@ -174,7 +171,7 @@ async function setupCollections(options: ServerOptions, logger: Logger) {
'npm view @openfn/language-collections@latest version'
);
logger.log('Using collections version from @latest: ', version);
return version;
return version.trim();
}

function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
Expand Down Expand Up @@ -322,7 +319,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {

if (options.lightning) {
setupCollections(options, logger).then((version) => {
app.collectionsVersion = version;
app.options.collectionsVersion = version;
connect(app, logger, options);
});
} else {
Expand Down
12 changes: 10 additions & 2 deletions packages/ws-worker/src/util/convert-lightning-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,16 @@ export default (
const job = step as Job;
if (job.expression?.match(/(collections\.)/)) {
hasCollections = true;
job.adaptors ??= [];
job.adaptors.push(`@openfn/language-collections@${collectionsVersion}`);
if (
!job.adaptors?.find((v) =>
v.startsWith('@openfn/language-collections')
)
) {
job.adaptors ??= [];
job.adaptors.push(
`@openfn/language-collections@${collectionsVersion}`
);
}
}
});
return hasCollections;
Expand Down
31 changes: 30 additions & 1 deletion packages/ws-worker/test/util/convert-lightning-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,36 @@ test('append the collections adaptor to jobs that use it', (t) => {
t.deepEqual(b.adaptors, ['common', '@openfn/[email protected]']);
});

test('append the collections credential to jobs that use it', (t) => {
test('do not append the collections adaptor to jobs that already have it', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
jobs: [
createNode({
id: 'a',
body: 'collections.each("c", "k", (state) => state)',
adaptor: '@openfn/language-collections@latest',
}),
],
triggers: [{ id: 't', type: 'cron' }],
edges: [createEdge('t', 'a')],
};

const { plan } = convertPlan(run as LightningPlan, {
collectionsVersion: '1.0.0',
});

const [_t, a] = plan.workflow.steps;

// @ts-ignore
t.deepEqual(a.adaptors, ['@openfn/language-collections@latest']);

t.deepEqual(plan.workflow.credentials, {
collections_token: true,
collections_endpoint: true,
});
});

test('append the collections credential to workflows that use it', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
jobs: [
Expand Down

0 comments on commit 21b66c4

Please sign in to comment.