Skip to content

Commit

Permalink
Add handling of execution versions + step schemas when fetching via API
Browse files Browse the repository at this point in the history
  • Loading branch information
jpwilliams committed Oct 6, 2023
1 parent 4f8776a commit be4971c
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 117 deletions.
8 changes: 5 additions & 3 deletions packages/inngest/src/api/api.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { type fetch } from "cross-fetch";
import { type ExecutionVersion } from "../components/execution/InngestExecution";
import { getFetch } from "../helpers/env";
import { hashSigningKey } from "../helpers/strings";
import { err, ok, type Result } from "../types";
import {
batchSchema,
errorSchema,
stepsSchema,
stepsSchemas,
type BatchResponse,
type ErrorResponse,
type StepsResponse,
Expand Down Expand Up @@ -46,7 +47,8 @@ export class InngestApi {
}

async getRunSteps(
runId: string
runId: string,
version: ExecutionVersion
): Promise<Result<StepsResponse, ErrorResponse>> {
const url = new URL(`/v0/runs/${runId}/actions`, this.baseUrl);

Expand All @@ -57,7 +59,7 @@ export class InngestApi {
const data: unknown = await resp.json();

if (resp.ok) {
return ok(stepsSchema.parse(data));
return ok(stepsSchemas[version].parse(data));
} else {
return err(errorSchema.parse(data));
}
Expand Down
169 changes: 103 additions & 66 deletions packages/inngest/src/api/schema.test.ts
Original file line number Diff line number Diff line change
@@ -1,84 +1,121 @@
import { stepsSchema } from "./schema";

describe("stepsSchema", () => {
test("handles v1 { data } objects", () => {
const expected = {
id: {
type: "data",
data: "something",
},
};

const actual = stepsSchema.safeParse({
id: {
data: "something",
},
import { stepsSchemas } from "@local/api/schema";
import { ExecutionVersion } from "@local/components/execution/InngestExecution";

describe("stepsSchemas", () => {
describe("v0", () => {
const schema = stepsSchemas[ExecutionVersion.V0];

test("handles any data", () => {
const expected = {
id: "something",
id2: "something else",
id3: true,
id4: { data: false },
};

const actual = schema.safeParse({
id: "something",
id2: "something else",
id3: true,
id4: { data: false },
});

expect(actual.success).toBe(true);
expect(actual.success && actual.data).toEqual(expected);
});

expect(actual.success).toBe(true);
expect(actual.success && actual.data).toEqual(expected);
test("throws if finding undefined value", () => {
const result = schema.safeParse({
id: "something",
id2: undefined,
});

expect(result.success).toBe(false);
});
});

test("handles v1 { error } objects", () => {
const expected = {
id: {
type: "error",
error: {
name: "Error",
message: "something",
describe("v1", () => {
const schema = stepsSchemas[ExecutionVersion.V1];

test("handles v1 { data } objects", () => {
const expected = {
id: {
type: "data",
data: "something",
},
},
};

const actual = stepsSchema.safeParse({
id: {
error: {
name: "Error",
message: "something",
};

const actual = schema.safeParse({
id: {
data: "something",
},
},
});

expect(actual.success).toBe(true);
expect(actual.success && actual.data).toEqual(expected);
});

expect(actual.success).toBe(true);
expect(actual.success && actual.data).toEqual(expected);
});
test("handles null from a v0 or v1 sleep or waitForEvent", () => {
const expected = {
id: {
type: "data",
data: null,
},
};

const actual = stepsSchema.safeParse({
id: null,
test("handles v1 { error } objects", () => {
const expected = {
id: {
type: "error",
error: {
name: "Error",
message: "something",
},
},
};

const actual = schema.safeParse({
id: {
error: {
name: "Error",
message: "something",
},
},
});

expect(actual.success).toBe(true);
expect(actual.success && actual.data).toEqual(expected);
});
test("handles null from a v0 or v1 sleep or waitForEvent", () => {
const expected = {
id: {
type: "data",
data: null,
},
};

expect(actual.success).toBe(true);
expect(actual.success && actual.data).toEqual(expected);
});
const actual = schema.safeParse({
id: null,
});

test("handles event from v0 or v1 waitForEvent", () => {
const expected = {
id: {
type: "data",
data: {
expect(actual.success).toBe(true);
expect(actual.success && actual.data).toEqual(expected);
});

test("handles event from v0 or v1 waitForEvent", () => {
const expected = {
id: {
type: "data",
data: {
name: "event",
data: { some: "data" },
ts: 123,
},
},
};

const actual = schema.safeParse({
id: {
name: "event",
data: { some: "data" },
ts: 123,
},
},
};

const actual = stepsSchema.safeParse({
id: {
name: "event",
data: { some: "data" },
ts: 123,
},
});
});

expect(actual.success).toBe(true);
expect(actual.success && actual.data).toEqual(expected);
expect(actual.success).toBe(true);
expect(actual.success && actual.data).toEqual(expected);
});
});
});
71 changes: 42 additions & 29 deletions packages/inngest/src/api/schema.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { z } from "zod";
import { ExecutionVersion } from "../components/execution/InngestExecution";
import { failureEventErrorSchema, type EventPayload } from "../types";

export const errorSchema = z.object({
Expand All @@ -7,38 +8,50 @@ export const errorSchema = z.object({
});
export type ErrorResponse = z.infer<typeof errorSchema>;

export const stepsSchema = z
.record(
z
.object({
type: z.literal("data").optional().default("data"),
data: z.any().refine((v) => typeof v !== "undefined", {
message: "Data in steps must be defined",
}),
export const stepsSchemas = {
[ExecutionVersion.V0]: z
.record(
z.any().refine((v) => typeof v !== "undefined", {
message: "Values in steps must be defined",
})
.strict()
.or(
z
.object({
type: z.literal("error").optional().default("error"),
error: failureEventErrorSchema,
})
.strict()
)
)
.optional()
.nullable(),
[ExecutionVersion.V1]: z
.record(
z
.object({
type: z.literal("data").optional().default("data"),
data: z.any().refine((v) => typeof v !== "undefined", {
message: "Data in steps must be defined",
}),
})
.strict()
.or(
z
.object({
type: z.literal("error").optional().default("error"),
error: failureEventErrorSchema,
})
.strict()
)

/**
* If the result isn't a distcint `data` or `error` object, then it's
* likely that the executor has set this directly to a value, for example
* in the case of `sleep` or `waitForEvent`.
*
* In this case, pull the entire value through as data.
*/
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
.or(z.any().transform((v) => ({ type: "data" as const, data: v })))
)
.default({});
/**
* If the result isn't a distcint `data` or `error` object, then it's
* likely that the executor has set this directly to a value, for example
* in the case of `sleep` or `waitForEvent`.
*
* In this case, pull the entire value through as data.
*/
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
.or(z.any().transform((v) => ({ type: "data" as const, data: v })))
)
.default({}),
} satisfies Record<ExecutionVersion, z.ZodSchema>;

export type StepsResponse = z.infer<typeof stepsSchema>;
export type StepsResponse = {
[V in ExecutionVersion]: z.infer<(typeof stepsSchemas)[V]>;
}[ExecutionVersion];

export const batchSchema = z.array(
z.record(z.any()).transform((v) => v as EventPayload)
Expand Down
9 changes: 5 additions & 4 deletions packages/inngest/src/components/InngestCommHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -839,10 +839,11 @@ export class InngestCommHandler<
const { version } = immediateFnData;

const result = runAsPromise(async () => {
const anyFnData = await fetchAllFnData(
immediateFnData,
this.client["inngestApi"]
);
const anyFnData = await fetchAllFnData({
data: immediateFnData,
api: this.client["inngestApi"],
version,
});
if (!anyFnData.ok) {
throw new Error(anyFnData.error);
}
Expand Down
28 changes: 13 additions & 15 deletions packages/inngest/src/helpers/functions.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ZodError, z } from "zod";
import { type InngestApi } from "../api/api";
import { stepsSchema } from "../api/schema";
import { stepsSchemas } from "../api/schema";
import {
ExecutionVersion,
PREFERRED_EXECUTION_VERSION,
Expand Down Expand Up @@ -107,14 +107,7 @@ export const parseFnData = (data: unknown) => {
.object({
event: z.record(z.any()),
events: z.array(z.record(z.any())).default([]),
steps: z
.record(
z.any().refine((v) => typeof v !== "undefined", {
message: "Values in steps must be defined",
})
)
.optional()
.nullable(),
steps: stepsSchemas[ExecutionVersion.V0],
ctx: z
.object({
run_id: z.string(),
Expand Down Expand Up @@ -145,7 +138,7 @@ export const parseFnData = (data: unknown) => {
.object({
event: z.record(z.any()),
events: z.array(z.record(z.any())).default([]),
steps: stepsSchema,
steps: stepsSchemas[ExecutionVersion.V1],
ctx: z
.object({
run_id: z.string(),
Expand Down Expand Up @@ -179,10 +172,15 @@ export const parseFnData = (data: unknown) => {
export type FnData = ReturnType<typeof parseFnData>;

type ParseErr = string;
export const fetchAllFnData = async (
data: FnData,
api: InngestApi
): Promise<Result<FnData, ParseErr>> => {
export const fetchAllFnData = async ({
data,
api,
version,
}: {
data: FnData;
api: InngestApi;
version: ExecutionVersion;
}): Promise<Result<FnData, ParseErr>> => {
const result = { ...data };

try {
Expand All @@ -203,7 +201,7 @@ export const fetchAllFnData = async (

const [evtResp, stepResp] = await Promise.all([
api.getRunBatch(result.ctx.run_id),
api.getRunSteps(result.ctx.run_id),
api.getRunSteps(result.ctx.run_id, version),
]);

if (evtResp.ok) {
Expand Down

0 comments on commit be4971c

Please sign in to comment.