From be4971ca13d9185e51a37455064ed059a79a9fee Mon Sep 17 00:00:00 2001 From: Jack Williams <1736957+jpwilliams@users.noreply.github.com> Date: Fri, 6 Oct 2023 10:22:10 +0000 Subject: [PATCH] Add handling of execution versions + step schemas when fetching via API --- packages/inngest/src/api/api.ts | 8 +- packages/inngest/src/api/schema.test.ts | 169 +++++++++++------- packages/inngest/src/api/schema.ts | 71 +++++--- .../src/components/InngestCommHandler.ts | 9 +- packages/inngest/src/helpers/functions.ts | 28 ++- 5 files changed, 168 insertions(+), 117 deletions(-) diff --git a/packages/inngest/src/api/api.ts b/packages/inngest/src/api/api.ts index a4a793b6a..5e5c017b9 100644 --- a/packages/inngest/src/api/api.ts +++ b/packages/inngest/src/api/api.ts @@ -1,11 +1,12 @@ import { type fetch } from "cross-fetch"; +import { type ExecutionVersion } from "../components/execution/InngestExecution"; import { getFetch } from "../helpers/env"; import { hashSigningKey } from "../helpers/strings"; import { err, ok, type Result } from "../types"; import { batchSchema, errorSchema, - stepsSchema, + stepsSchemas, type BatchResponse, type ErrorResponse, type StepsResponse, @@ -46,7 +47,8 @@ export class InngestApi { } async getRunSteps( - runId: string + runId: string, + version: ExecutionVersion ): Promise> { const url = new URL(`/v0/runs/${runId}/actions`, this.baseUrl); @@ -57,7 +59,7 @@ export class InngestApi { const data: unknown = await resp.json(); if (resp.ok) { - return ok(stepsSchema.parse(data)); + return ok(stepsSchemas[version].parse(data)); } else { return err(errorSchema.parse(data)); } diff --git a/packages/inngest/src/api/schema.test.ts b/packages/inngest/src/api/schema.test.ts index b8b9c9e5d..2f508b381 100644 --- a/packages/inngest/src/api/schema.test.ts +++ b/packages/inngest/src/api/schema.test.ts @@ -1,84 +1,121 @@ -import { stepsSchema } from "./schema"; - -describe("stepsSchema", () => { - test("handles v1 { data } objects", () => { - const expected = { - id: { - type: "data", - data: "something", - }, - }; - - const actual = stepsSchema.safeParse({ - id: { - data: "something", - }, +import { stepsSchemas } from "@local/api/schema"; +import { ExecutionVersion } from "@local/components/execution/InngestExecution"; + +describe("stepsSchemas", () => { + describe("v0", () => { + const schema = stepsSchemas[ExecutionVersion.V0]; + + test("handles any data", () => { + const expected = { + id: "something", + id2: "something else", + id3: true, + id4: { data: false }, + }; + + const actual = schema.safeParse({ + id: "something", + id2: "something else", + id3: true, + id4: { data: false }, + }); + + expect(actual.success).toBe(true); + expect(actual.success && actual.data).toEqual(expected); }); - expect(actual.success).toBe(true); - expect(actual.success && actual.data).toEqual(expected); + test("throws if finding undefined value", () => { + const result = schema.safeParse({ + id: "something", + id2: undefined, + }); + + expect(result.success).toBe(false); + }); }); - test("handles v1 { error } objects", () => { - const expected = { - id: { - type: "error", - error: { - name: "Error", - message: "something", + describe("v1", () => { + const schema = stepsSchemas[ExecutionVersion.V1]; + + test("handles v1 { data } objects", () => { + const expected = { + id: { + type: "data", + data: "something", }, - }, - }; - - const actual = stepsSchema.safeParse({ - id: { - error: { - name: "Error", - message: "something", + }; + + const actual = schema.safeParse({ + id: { + data: "something", }, - }, + }); + + expect(actual.success).toBe(true); + expect(actual.success && actual.data).toEqual(expected); }); - expect(actual.success).toBe(true); - expect(actual.success && actual.data).toEqual(expected); - }); - test("handles null from a v0 or v1 sleep or waitForEvent", () => { - const expected = { - id: { - type: "data", - data: null, - }, - }; - - const actual = stepsSchema.safeParse({ - id: null, + test("handles v1 { error } objects", () => { + const expected = { + id: { + type: "error", + error: { + name: "Error", + message: "something", + }, + }, + }; + + const actual = schema.safeParse({ + id: { + error: { + name: "Error", + message: "something", + }, + }, + }); + + expect(actual.success).toBe(true); + expect(actual.success && actual.data).toEqual(expected); }); + test("handles null from a v0 or v1 sleep or waitForEvent", () => { + const expected = { + id: { + type: "data", + data: null, + }, + }; - expect(actual.success).toBe(true); - expect(actual.success && actual.data).toEqual(expected); - }); + const actual = schema.safeParse({ + id: null, + }); - test("handles event from v0 or v1 waitForEvent", () => { - const expected = { - id: { - type: "data", - data: { + expect(actual.success).toBe(true); + expect(actual.success && actual.data).toEqual(expected); + }); + + test("handles event from v0 or v1 waitForEvent", () => { + const expected = { + id: { + type: "data", + data: { + name: "event", + data: { some: "data" }, + ts: 123, + }, + }, + }; + + const actual = schema.safeParse({ + id: { name: "event", data: { some: "data" }, ts: 123, }, - }, - }; - - const actual = stepsSchema.safeParse({ - id: { - name: "event", - data: { some: "data" }, - ts: 123, - }, - }); + }); - expect(actual.success).toBe(true); - expect(actual.success && actual.data).toEqual(expected); + expect(actual.success).toBe(true); + expect(actual.success && actual.data).toEqual(expected); + }); }); }); diff --git a/packages/inngest/src/api/schema.ts b/packages/inngest/src/api/schema.ts index 08ab9475f..21bee7921 100644 --- a/packages/inngest/src/api/schema.ts +++ b/packages/inngest/src/api/schema.ts @@ -1,4 +1,5 @@ import { z } from "zod"; +import { ExecutionVersion } from "../components/execution/InngestExecution"; import { failureEventErrorSchema, type EventPayload } from "../types"; export const errorSchema = z.object({ @@ -7,38 +8,50 @@ export const errorSchema = z.object({ }); export type ErrorResponse = z.infer; -export const stepsSchema = z - .record( - z - .object({ - type: z.literal("data").optional().default("data"), - data: z.any().refine((v) => typeof v !== "undefined", { - message: "Data in steps must be defined", - }), +export const stepsSchemas = { + [ExecutionVersion.V0]: z + .record( + z.any().refine((v) => typeof v !== "undefined", { + message: "Values in steps must be defined", }) - .strict() - .or( - z - .object({ - type: z.literal("error").optional().default("error"), - error: failureEventErrorSchema, - }) - .strict() - ) + ) + .optional() + .nullable(), + [ExecutionVersion.V1]: z + .record( + z + .object({ + type: z.literal("data").optional().default("data"), + data: z.any().refine((v) => typeof v !== "undefined", { + message: "Data in steps must be defined", + }), + }) + .strict() + .or( + z + .object({ + type: z.literal("error").optional().default("error"), + error: failureEventErrorSchema, + }) + .strict() + ) - /** - * If the result isn't a distcint `data` or `error` object, then it's - * likely that the executor has set this directly to a value, for example - * in the case of `sleep` or `waitForEvent`. - * - * In this case, pull the entire value through as data. - */ - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - .or(z.any().transform((v) => ({ type: "data" as const, data: v }))) - ) - .default({}); + /** + * If the result isn't a distcint `data` or `error` object, then it's + * likely that the executor has set this directly to a value, for example + * in the case of `sleep` or `waitForEvent`. + * + * In this case, pull the entire value through as data. + */ + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + .or(z.any().transform((v) => ({ type: "data" as const, data: v }))) + ) + .default({}), +} satisfies Record; -export type StepsResponse = z.infer; +export type StepsResponse = { + [V in ExecutionVersion]: z.infer<(typeof stepsSchemas)[V]>; +}[ExecutionVersion]; export const batchSchema = z.array( z.record(z.any()).transform((v) => v as EventPayload) diff --git a/packages/inngest/src/components/InngestCommHandler.ts b/packages/inngest/src/components/InngestCommHandler.ts index 7c41b758d..39a52097d 100644 --- a/packages/inngest/src/components/InngestCommHandler.ts +++ b/packages/inngest/src/components/InngestCommHandler.ts @@ -839,10 +839,11 @@ export class InngestCommHandler< const { version } = immediateFnData; const result = runAsPromise(async () => { - const anyFnData = await fetchAllFnData( - immediateFnData, - this.client["inngestApi"] - ); + const anyFnData = await fetchAllFnData({ + data: immediateFnData, + api: this.client["inngestApi"], + version, + }); if (!anyFnData.ok) { throw new Error(anyFnData.error); } diff --git a/packages/inngest/src/helpers/functions.ts b/packages/inngest/src/helpers/functions.ts index 51aee6fb1..c110688a4 100644 --- a/packages/inngest/src/helpers/functions.ts +++ b/packages/inngest/src/helpers/functions.ts @@ -1,6 +1,6 @@ import { ZodError, z } from "zod"; import { type InngestApi } from "../api/api"; -import { stepsSchema } from "../api/schema"; +import { stepsSchemas } from "../api/schema"; import { ExecutionVersion, PREFERRED_EXECUTION_VERSION, @@ -107,14 +107,7 @@ export const parseFnData = (data: unknown) => { .object({ event: z.record(z.any()), events: z.array(z.record(z.any())).default([]), - steps: z - .record( - z.any().refine((v) => typeof v !== "undefined", { - message: "Values in steps must be defined", - }) - ) - .optional() - .nullable(), + steps: stepsSchemas[ExecutionVersion.V0], ctx: z .object({ run_id: z.string(), @@ -145,7 +138,7 @@ export const parseFnData = (data: unknown) => { .object({ event: z.record(z.any()), events: z.array(z.record(z.any())).default([]), - steps: stepsSchema, + steps: stepsSchemas[ExecutionVersion.V1], ctx: z .object({ run_id: z.string(), @@ -179,10 +172,15 @@ export const parseFnData = (data: unknown) => { export type FnData = ReturnType; type ParseErr = string; -export const fetchAllFnData = async ( - data: FnData, - api: InngestApi -): Promise> => { +export const fetchAllFnData = async ({ + data, + api, + version, +}: { + data: FnData; + api: InngestApi; + version: ExecutionVersion; +}): Promise> => { const result = { ...data }; try { @@ -203,7 +201,7 @@ export const fetchAllFnData = async ( const [evtResp, stepResp] = await Promise.all([ api.getRunBatch(result.ctx.run_id), - api.getRunSteps(result.ctx.run_id), + api.getRunSteps(result.ctx.run_id, version), ]); if (evtResp.ok) {