Skip to content

Commit

Permalink
Collections (#801)
Browse files Browse the repository at this point in the history
* compiler: support multiple adaptors when handling imports

* cli: add support for multiple adaptors on compiler

* compiler: don't extract common exports

* compiler: be more specific about how we calculate function exports

* remove the adaptors prop

* cli: working through adaptor -> adaptors changes

Not done yet

* cli: fix remaining tests after refactor

* worker: refactor to support adaptors array

* various: yet another type restructure to handle adaptors safely

* cli:type tweak

* compiler: update tests

* fix type

* engine: fix test

* tests: fix execute tests

* engine: fix another test

* worker: more typings

* tidyup monorepo in preloadAdaptorExports

* worker: automatically append the collections adaptor to steps that need it

* runtime: support global congfiguration on state assembly

* runtime: support global credentials

* worker: create global credential for collections

* fixse

* worker: accept collections version from CLI and refactor some stuff

* worker: lookup latest collections version on server start

* worker: changeset

* worker: update tests

* types

* tests: force collections token to stop lookups

* tests: add test for collections

* engine: remove .only

* typo

* worker: fix collections version

* collections: use latest rather than next

* versions

* worker: support @Local adaptor versions

* changeset

* worker: hook up monorepoDir argument

* runtime: allow a specifier to include a file path

* runtime:test

* runtime: changeset

* engine: don't try to autoinstall adaptors with an explicit path

* worker: fix env var

* worker: drive collections url from a new option

* worker: logging around collections url

* cleaner implementation of local adaptor paths

* runtime: revert linker change

* more cleanup

* update tests

* tests: integration test for worker monorepo

* fix test

* versions

* engine: fix an issue where local adaptors don't load exports properly
  • Loading branch information
josephjclark authored Oct 30, 2024
1 parent 591bcc8 commit 451a3fd
Show file tree
Hide file tree
Showing 82 changed files with 1,332 additions and 687 deletions.
10 changes: 10 additions & 0 deletions integration-tests/execute/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/integration-tests-execute

## 1.0.6

### Patch Changes

- Updated dependencies [3463ff9]
- Updated dependencies [7a85894]
- Updated dependencies [b6de2c4]
- @openfn/runtime@1.5.0
- @openfn/compiler@0.4.0

## 1.0.5

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/execute/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-execute",
"private": true,
"version": "1.0.5",
"version": "1.0.6",
"description": "Job execution tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
10 changes: 6 additions & 4 deletions integration-tests/execute/src/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ const execute = async (job: string, state: any, adaptor = 'common') => {
// compile with common and dumb imports
const options = {
'add-imports': {
adaptor: {
name: `@openfn/language-${adaptor}`,
exportAll: true,
},
adaptors: [
{
name: `@openfn/language-${adaptor}`,
exportAll: true,
},
],
},
};
const compiled = compiler(job, options);
Expand Down
13 changes: 13 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# @openfn/integration-tests-worker

## 1.0.63

### Patch Changes

- Updated dependencies [fd0e499]
- Updated dependencies [1c79dc1]
- Updated dependencies [7245bf7]
- Updated dependencies [b15f151]
- Updated dependencies [bcd82e9]
- @openfn/ws-worker@1.8.0
- @openfn/engine-multi@1.4.0
- @openfn/lightning-mock@2.0.21

## 1.0.62

### Patch Changes
Expand Down
8 changes: 8 additions & 0 deletions integration-tests/worker/monorepo/packages/common/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
function fortyTwo() {
return (state) => {
state.data = 42;
return state;
};
}

module.exports = { fortyTwo };
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name": "@openfn/language-common",
"private": true,
"version": "1.0.0",
"dependencies": {},
"devDependencies": {}
}
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.62",
"version": "1.0.63",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/worker/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import createLightningServer, { toBase64 } from '@openfn/lightning-mock';
import createEngine from '@openfn/engine-multi';
import createWorkerServer from '@openfn/ws-worker';
import { createMockLogger } from '@openfn/logger';
// import createLogger from '@openfn/logger';

export const randomPort = () => Math.round(2000 + Math.random() * 1000);

Expand Down Expand Up @@ -43,6 +44,7 @@ export const initWorker = async (
port: workerPort,
lightning: `ws://localhost:${lightningPort}/worker`,
secret: crypto.randomUUID(),
collectionsVersion: '1.0.0',
...workerArgs,
});

Expand Down
26 changes: 25 additions & 1 deletion integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ test.before(async () => {
maxWorkers: 1,
repoDir: path.resolve('tmp/repo/integration'),
};
const workerArgs = { runPublicKey: keys.public };
const workerArgs = {
runPublicKey: keys.public,
monorepoDir: path.resolve('monorepo'),
};

({ worker, engine, engineLogger } = await initWorker(
lightningPort,
Expand Down Expand Up @@ -599,6 +602,27 @@ test.serial('Include timestamps on basically everything', (t) => {
});
});

test.serial('use local adaptor versions from monorepo', (t) => {
return new Promise(async (done) => {
lightning.once('run:complete', (evt) => {
const result = lightning.getResult('a1');
t.deepEqual(result, { data: 42 });
done();
});

lightning.enqueueRun({
id: 'a1',
jobs: [
{
id: 'j1',
body: 'fortyTwo()',
adaptor: '@openfn/language-common@local',
},
],
});
});
});

test.serial("Don't send adaptor logs to stdout", (t) => {
return new Promise(async (done) => {
// We have to create a new worker with a different repo for this one
Expand Down
38 changes: 36 additions & 2 deletions integration-tests/worker/test/runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ test.before(async () => {
repoDir: path.resolve('tmp/repo/attempts'),
},
{
collectionsVersion: '1.0.0-next-f802225c',
runPublicKey: keys.public,
}
));
Expand All @@ -49,8 +50,8 @@ const run = async (t, attempt) => {
// TODO friendlier job names for this would be nice (rather than run ids)
t.log(
`run ${payload.step_id} done in ${payload.duration / 1000}s [${humanMb(
payload.mem.job
)} / ${humanMb(payload.mem.system)}mb] [thread ${payload.thread_id}]`
payload.mem?.job
)} / ${humanMb(payload.mem?.system)}mb] [thread ${payload.thread_id}]`
);
});
lightning.on('run:complete', (evt) => {
Expand Down Expand Up @@ -248,3 +249,36 @@ test.serial('use different versions of the same adaptor', async (t) => {
t.log(result);
t.falsy(result.errors);
});

test.serial('Run with collections', async (t) => {
const job1 = createJob({
body: `fn((state = {}) => {
const server = collections.createMockServer();
collections.setMockClient(server);
server.api.createCollection('collection');
state.data = [{ id: 'a' }, { id: 'b' }, { id: 'c' }];
state.results = [];
return state;
});
collections.set('collection', v => v.id, $.data);
collections.each('collection', '*', (state, value, key) => {
state.results.push({ key, value })
});
`,
// Note: for some reason 1.7.0 fails because it exports a collections ??
// 1.7.4 seems fine
adaptor: '@openfn/[email protected]',
});
const attempt = createRun([], [job1], []);

const { results } = await run(t, attempt);
t.deepEqual(results, [
{ key: 'a', value: { id: 'a' } },
{ key: 'b', value: { id: 'b' } },
{ key: 'c', value: { id: 'c' } },
]);
});
1 change: 1 addition & 0 deletions integration-tests/worker/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const spawnServer = (port: string | number = 1, args: string[] = []) => {
'--backoff 0.001/0.01',
'--log debug',
'-s secretsquirrel',
'--collections-version=1.0.0',
...args,
],
options
Expand Down
11 changes: 11 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# @openfn/cli

## 1.8.6

### Patch Changes

- 528e9a0: Support multiple adaptors
- Updated dependencies [3463ff9]
- Updated dependencies [7a85894]
- Updated dependencies [b6de2c4]
- @openfn/runtime@1.5.0
- @openfn/compiler@0.4.0

## 1.8.5

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.8.5",
"version": "1.8.6",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
54 changes: 26 additions & 28 deletions packages/cli/src/compile/compile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ const compileWorkflow = async (
const job = step as Job;
const jobOpts = {
...opts,
adaptors: job.adaptors ?? opts.adaptors,
};
if (job.adaptor) {
jobOpts.adaptors = [job.adaptor];
}
if (job.expression) {
job.expression = await compileJob(
job.expression as string,
Expand Down Expand Up @@ -115,37 +113,37 @@ export const loadTransformOptions = async (
// If an adaptor is passed in, we need to look up its declared exports
// and pass them along to the compiler
if (opts.adaptors?.length && opts.ignoreImports != true) {
let exports;
const [pattern] = opts.adaptors;
const [specifier] = pattern.split('=');

// Preload exports from a path, optionally logging errors in case of a failure
log.debug(`Trying to preload types for ${specifier}`);
const path = await resolveSpecifierPath(pattern, opts.repoDir, log);
if (path) {
try {
exports = await preloadAdaptorExports(
path,
opts.useAdaptorsMonorepo,
log
);
} catch (e) {
log.error(`Failed to load adaptor typedefs from path ${path}`);
log.error(e);
const adaptorsConfig = [];
for (const adaptorInput of opts.adaptors) {
let exports;
const [specifier] = adaptorInput.split('=');

// Preload exports from a path, optionally logging errors in case of a failure
log.debug(`Trying to preload types for ${specifier}`);
const path = await resolveSpecifierPath(adaptorInput, opts.repoDir, log);
if (path) {
try {
exports = await preloadAdaptorExports(path, log);
} catch (e) {
log.error(`Failed to load adaptor typedefs from path ${path}`);
log.error(e);
}
}
}

if (!exports || exports.length === 0) {
log.debug(`No module exports found for ${pattern}`);
}
if (!exports || exports.length === 0) {
log.debug(`No module exports found for ${adaptorInput}`);
}

options['add-imports'] = {
ignore: opts.ignoreImports as string[],
adaptor: {
adaptorsConfig.push({
name: stripVersionSpecifier(specifier),
exports,
exportAll: true,
},
});
}

options['add-imports'] = {
ignore: opts.ignoreImports as string[],
adaptors: adaptorsConfig,
};
}

Expand Down
12 changes: 7 additions & 5 deletions packages/cli/src/execute/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ export function parseAdaptors(plan: ExecutionPlan) {

const adaptors: ModuleInfoMap = {};

// TODO what if there are different versions of the same adaptor?
// This structure can't handle it - we'd need to build it for every job
Object.values(plan.workflow.steps).forEach((step) => {
const job = step as Job;
if (job.adaptor) {
const { name, ...maybeVersionAndPath } = extractInfo(job.adaptor);
// Usually every job should have an adaptors array
// But there are a couple of cases mostly in test, when validation is skipped,
// when the array may not be set
// It's mostly redundant nbut harmless to optionally chain here
job.adaptors?.forEach((adaptor) => {
const { name, ...maybeVersionAndPath } = extractInfo(adaptor);
adaptors[name] = maybeVersionAndPath;
}
});
});

return adaptors;
Expand Down
8 changes: 5 additions & 3 deletions packages/cli/src/execute/get-autoinstall-targets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ const getAutoinstallTargets = (plan: ExecutionPlan) => {
Object.values(plan.workflow.steps).forEach((step) => {
const job = step as Job;
// Do not autoinstall adaptors with a path
if (job.adaptor && !/=/.test(job.adaptor)) {
adaptors[job.adaptor] = true;
}
job.adaptors
?.filter((adaptor) => !/=/.test(adaptor))
.forEach((adaptor) => {
adaptors[adaptor] = true;
});
});
return Object.keys(adaptors);
};
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/execute/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => {
if (options.start) {
customStart = matchStep(
plan,
options.start ?? plan.options.start,
options.start ?? plan.options!.start,
'start',
logger
);
Expand All @@ -95,7 +95,7 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => {
if (options.end) {
customEnd = matchStep(
plan,
options.end ?? plan.options.end,
options.end ?? plan.options!.end,
'end',
logger
);
Expand All @@ -113,8 +113,8 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => {
const finalPlan = {
...plan,
options: {
...plan.options,
start: customStart || plan.options.start,
...plan.options!,
start: customStart || plan.options!.start,
end: customEnd,
},
workflow: plan.workflow,
Expand Down
Loading

0 comments on commit 451a3fd

Please sign in to comment.