From 484cefcbddf2e239c935f827883820791107cc27 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Wed, 23 Oct 2024 21:04:37 +0300 Subject: [PATCH 1/8] feat: add context.cancel --- src/context/context.test.ts | 24 +++++++++++++++- src/context/context.ts | 50 ++++++++++++++++++++++++++++----- src/error.ts | 16 +++++++++-- src/integration.test.ts | 29 ++++++++++++++++++- src/serve/authorization.test.ts | 5 ++++ src/serve/authorization.ts | 7 +++++ src/serve/serve.test.ts | 38 ++++++++++++++++++++++++- src/workflow-requests.test.ts | 32 +++++++++++++++++++++ src/workflow-requests.ts | 12 ++++++-- 9 files changed, 199 insertions(+), 14 deletions(-) diff --git a/src/context/context.test.ts b/src/context/context.test.ts index 4006e3e..8496129 100644 --- a/src/context/context.test.ts +++ b/src/context/context.test.ts @@ -4,7 +4,7 @@ import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../ import { WorkflowContext } from "./context"; import { Client } from "@upstash/qstash"; import { nanoid } from "../utils"; -import { QStashWorkflowError } from "../error"; +import { QStashWorkflowAbort, QStashWorkflowError } from "../error"; import { WORKFLOW_ID_HEADER, WORKFLOW_INIT_HEADER, @@ -262,4 +262,26 @@ describe("context tests", () => { }); }); }); + + test("cancel should throw abort with cleanup: true", async () => { + const context = new WorkflowContext({ + qstashClient, + initialPayload: "my-payload", + steps: [], + url: WORKFLOW_ENDPOINT, + headers: new Headers() as Headers, + workflowRunId: "wfr-id", + }); + try { + await context.cancel(); + } catch (error) { + expect(error instanceof QStashWorkflowAbort).toBeTrue(); + const _error = error as QStashWorkflowAbort; + expect(_error.stepName).toBe("cancel"); + expect(_error.name).toBe("QStashWorkflowAbort"); + expect(_error.cancelWorkflow).toBeTrue(); + return; + } + throw new Error("Test error: context.cancel should have thrown abort error."); + }); }); diff --git a/src/context/context.ts b/src/context/context.ts index 8937b44..3898d4f 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -13,6 +13,7 @@ import { import type { HTTPMethods } from "@upstash/qstash"; import type { WorkflowLogger } from "../logger"; import { DEFAULT_RETRIES } from "../constants"; +import { QStashWorkflowAbort } from "../error"; /** * Upstash Workflow context @@ -201,7 +202,7 @@ export class WorkflowContext { * const [result1, result2] = await Promise.all([ * context.run("step 1", () => { * return "result1" - * }) + * }), * context.run("step 2", async () => { * return await fetchResults() * }) @@ -224,6 +225,10 @@ export class WorkflowContext { /** * Stops the execution for the duration provided. * + * ```typescript + * await context.sleep('sleep1', 3) // wait for three seconds + * ``` + * * @param stepName * @param duration sleep duration in seconds * @returns undefined @@ -235,6 +240,10 @@ export class WorkflowContext { /** * Stops the execution until the date time provided. * + * ```typescript + * await context.sleepUntil('sleep1', Date.now() / 1000 + 3) // wait for three seconds + * ``` + * * @param stepName * @param datetime time to sleep until. Can be provided as a number (in unix seconds), * as a Date object or a string (passed to `new Date(datetimeString)`) @@ -258,11 +267,11 @@ export class WorkflowContext { * network call without consuming any runtime. * * ```ts - * const postResult = await context.call( - * "post call step", - * `https://www.some-endpoint.com/api`, - * "POST", - * "my-payload" + * const postResult = await context.call("post call step", { + * url: "https://www.some-endpoint.com/api", + * method: "POST", + * body: "my-payload" + * } * ); * ``` * @@ -272,7 +281,7 @@ export class WorkflowContext { * * @param stepName * @param url url to call - * @param method call method + * @param method call method. "GET" by default * @param body call body * @param headers call headers * @returns call result as { @@ -353,6 +362,8 @@ export class WorkflowContext { * }) * ``` * + * Alternatively, you can use the `context.notify` method. + * * @param stepName * @param eventId event id to wake up the waiting workflow run * @param timeout timeout duration in seconds @@ -383,6 +394,20 @@ export class WorkflowContext { } } + /** + * Notify waiting workflow runs + * + * ```ts + * const { eventId, eventData, notifyResponse } = await context.notify( + * "notify step", "event-id", "event-data" + * ); + * ``` + * + * @param stepName + * @param eventId event id to notify + * @param eventData event data to notify with + * @returns notify response which has event id, event data and list of waiters which were notified + */ public async notify( stepName: string, eventId: string, @@ -402,6 +427,17 @@ export class WorkflowContext { } } + /** + * Cancel the current workflow run + * + * Will throw QStashWorkflowAbort to stop workflow execution. + * Shouldn't be inside try/catch. + */ + public async cancel() { + // throw an abort which will make the workflow cancel + throw new QStashWorkflowAbort("cancel", undefined, true); + } + /** * Adds steps to the executor. Needed so that it can be overwritten in * DisabledWorkflowContext. diff --git a/src/error.ts b/src/error.ts index 2dfc106..d4293ea 100644 --- a/src/error.ts +++ b/src/error.ts @@ -12,13 +12,24 @@ export class QStashWorkflowError extends QstashError { } /** - * Raised when the workflow executes a function and aborts + * Raised when the workflow executes a function successfully + * and aborts to end the execution */ export class QStashWorkflowAbort extends Error { public stepInfo?: Step; public stepName: string; + /** + * whether workflow is to be canceled on abort + */ + public cancelWorkflow: boolean; - constructor(stepName: string, stepInfo?: Step) { + /** + * + * @param stepName name of the aborting step + * @param stepInfo step information + * @param cancelWorkflow + */ + constructor(stepName: string, stepInfo?: Step, cancelWorkflow = false) { super( "This is an Upstash Workflow error thrown after a step executes. It is expected to be raised." + " Make sure that you await for each step. Also, if you are using try/catch blocks, you should not wrap context.run/sleep/sleepUntil/call methods with try/catch." + @@ -27,6 +38,7 @@ export class QStashWorkflowAbort extends Error { this.name = "QStashWorkflowAbort"; this.stepName = stepName; this.stepInfo = stepInfo; + this.cancelWorkflow = cancelWorkflow; } } diff --git a/src/integration.test.ts b/src/integration.test.ts index d17ef3f..780e33e 100644 --- a/src/integration.test.ts +++ b/src/integration.test.ts @@ -76,7 +76,7 @@ type Charge = { success: boolean; }; -class FinishState { +export class FinishState { public finished = false; public finish() { this.finished = true; @@ -755,4 +755,31 @@ describe.skip("live serve tests", () => { ); }); }); + + test( + "cancel workflow", + async () => { + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 3, + waitFor: 7000, + initialPayload: "my-payload", + finishState, + routeFunction: async (context) => { + const input = context.requestPayload; + expect(input).toBe("my-payload"); + + await context.sleep("sleep", 1); + + finishState.finish(); + await context.cancel(); + + throw new Error("shouldn't reach here"); + }, + }); + }, + { + timeout: 10_000, + } + ); }); diff --git a/src/serve/authorization.test.ts b/src/serve/authorization.test.ts index 345d3eb..de07939 100644 --- a/src/serve/authorization.test.ts +++ b/src/serve/authorization.test.ts @@ -86,6 +86,11 @@ describe("disabled workflow context", () => { }); expect(called).toBeTrue(); }); + + test("shouldn't throw on cancel", () => { + const dontThrow = disabledContext.cancel; + expect(dontThrow).not.toThrowError(); + }); }); describe("tryAuthentication", () => { diff --git a/src/serve/authorization.ts b/src/serve/authorization.ts index 6efd782..fffe87d 100644 --- a/src/serve/authorization.ts +++ b/src/serve/authorization.ts @@ -49,6 +49,13 @@ export class DisabledWorkflowContext< throw new QStashWorkflowAbort(DisabledWorkflowContext.disabledMessage); } + /** + * overwrite cancel method to do nothing + */ + public async cancel() { + return; + } + /** * copies the passed context to create a DisabledWorkflowContext. Then, runs the * route function with the new context. diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index a11e505..f3cdf0f 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -564,7 +564,8 @@ describe("serve", () => { }, } ); - await endpoint(request); + const response = await endpoint(request); + expect(response.status).toBe(200); expect(called).toBeTrue(); }); @@ -589,4 +590,39 @@ describe("serve", () => { }); expect(receiver).toBeDefined(); }); + + test("should call qstash to cancel workflow on context.cancel", async () => { + const request = getRequest(WORKFLOW_ENDPOINT, "wfr-foo", "my-payload", []); + let called = false; + let runs = false; + const { handler: endpoint } = serve( + async (context) => { + called = true; + await context.cancel(); + await context.run("wont run", () => { + runs = true; + }); + }, + { + qstashClient, + receiver: undefined, + verbose: true, + } + ); + + await mockQStashServer({ + execute: async () => { + const response = await endpoint(request); + expect(response.status).toBe(200); + }, + responseFields: { body: undefined, status: 200 }, + receivesRequest: { + method: "DELETE", + url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs/wfr-foo?cancel=false`, + token, + }, + }); + expect(called).toBeTrue(); + expect(runs).toBeFalse(); + }); }); diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index 4b5e09a..0adfb9a 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -29,6 +29,7 @@ import { mockQStashServer, WORKFLOW_ENDPOINT, } from "./test-utils"; +import { FinishState } from "./integration.test"; describe("Workflow Requests", () => { test("triggerFirstInvocation", async () => { @@ -120,6 +121,37 @@ describe("Workflow Requests", () => { }); }); + test("should call cleanup if context.cancel is called", async () => { + const workflowRunId = nanoid(); + const token = "myToken"; + + const context = new WorkflowContext({ + qstashClient: new Client({ baseUrl: MOCK_SERVER_URL, token }), + workflowRunId: workflowRunId, + initialPayload: undefined, + headers: new Headers({}) as Headers, + steps: [], + url: WORKFLOW_ENDPOINT, + }); + + const finished = new FinishState(); + const result = await triggerRouteFunction({ + onStep: async () => { + await context.cancel(); + await context.run("shouldn't call", () => { + throw new Error("shouldn't call context.run"); + }); + }, + onCleanup: async () => { + finished.finish(); + }, + }); + finished.check(); + expect(result.isOk()).toBeTrue(); + // @ts-expect-error value will be set since result isOk + expect(result.value).toBe("workflow-finished"); + }); + test("should call publishJSON in triggerWorkflowDelete", async () => { const workflowRunId = nanoid(); const token = "myToken"; diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index be9064f..08d5cd1 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -64,14 +64,22 @@ export const triggerRouteFunction = async ({ onCleanup: () => Promise; }): Promise | Err> => { try { - // When onStep completes successfully, it throws an exception named `QStashWorkflowAbort`, indicating that the step has been successfully executed. + // When onStep completes successfully, it throws an exception named `QStashWorkflowAbort`, + // indicating that the step has been successfully executed. // This ensures that onCleanup is only called when no exception is thrown. await onStep(); await onCleanup(); return ok("workflow-finished"); } catch (error) { const error_ = error as Error; - return error_ instanceof QStashWorkflowAbort ? ok("step-finished") : err(error_); + if (!(error_ instanceof QStashWorkflowAbort)) { + return err(error_); + } else if (error_.cancelWorkflow) { + await onCleanup(); + return ok("workflow-finished"); + } else { + return ok("step-finished"); + } } }; From 00288e2ce9c882c00f3df2820a96b41ab25e2f4b Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 24 Oct 2024 13:05:10 +0300 Subject: [PATCH 2/8] feat: add lazy fetch and rm context.rawInitialPayload initial payload is still accessible through context.initialPayload --- src/client/utils.ts | 15 +++- src/context/context.ts | 7 -- src/serve/authorization.ts | 1 - src/serve/index.ts | 3 +- src/test-utils.ts | 2 +- src/workflow-parser.test.ts | 163 +++++++++++++++++++++++++++--------- src/workflow-parser.ts | 30 +++++-- 7 files changed, 164 insertions(+), 57 deletions(-) diff --git a/src/client/utils.ts b/src/client/utils.ts index ff96501..7b15559 100644 --- a/src/client/utils.ts +++ b/src/client/utils.ts @@ -1,5 +1,6 @@ import { Client } from "@upstash/qstash"; -import { NotifyResponse, Waiter } from "../types"; +import { NotifyResponse, RawStep, Waiter } from "../types"; +import { WorkflowLogger } from "../logger"; export const makeNotifyRequest = async ( requester: Client["http"], @@ -25,3 +26,15 @@ export const makeGetWaitersRequest = async ( })) as Required[]; return result; }; + +export const getSteps = async ( + requester: Client["http"], + workflowRunId: string, + debug?: WorkflowLogger +): Promise => { + await debug?.log("INFO", "ENDPOINT_START", "Pulling steps from QStash."); + return (await requester.request({ + path: ["v2", "workflows", "runs", workflowRunId], + parseResponseAsJson: true, + })) as RawStep[]; +}; diff --git a/src/context/context.ts b/src/context/context.ts index 3898d4f..acc8208 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -117,10 +117,6 @@ export class WorkflowContext { * headers of the initial request */ public readonly headers: Headers; - /** - * initial payload as a raw string - */ - public readonly rawInitialPayload: string; /** * Map of environment variables and their values. * @@ -156,7 +152,6 @@ export class WorkflowContext { failureUrl, debug, initialPayload, - rawInitialPayload, env, retries, }: { @@ -168,7 +163,6 @@ export class WorkflowContext { failureUrl?: string; debug?: WorkflowLogger; initialPayload: TInitialPayload; - rawInitialPayload?: string; // optional for tests env?: Record; retries?: number; }) { @@ -179,7 +173,6 @@ export class WorkflowContext { this.failureUrl = failureUrl; this.headers = headers; this.requestPayload = initialPayload; - this.rawInitialPayload = rawInitialPayload ?? JSON.stringify(this.requestPayload); this.env = env ?? {}; this.retries = retries ?? DEFAULT_RETRIES; diff --git a/src/serve/authorization.ts b/src/serve/authorization.ts index fffe87d..c59f26e 100644 --- a/src/serve/authorization.ts +++ b/src/serve/authorization.ts @@ -82,7 +82,6 @@ export class DisabledWorkflowContext< url: context.url, failureUrl: context.failureUrl, initialPayload: context.requestPayload, - rawInitialPayload: context.rawInitialPayload, env: context.env, retries: context.retries, }); diff --git a/src/serve/index.ts b/src/serve/index.ts index d05817f..0cd48f0 100644 --- a/src/serve/index.ts +++ b/src/serve/index.ts @@ -79,6 +79,8 @@ export const serve = < const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestPayload, isFirstInvocation, + workflowRunId, + qstashClient.http, debug ); @@ -109,7 +111,6 @@ export const serve = < qstashClient, workflowRunId, initialPayload: initialPayloadParser(rawInitialPayload), - rawInitialPayload, headers: recreateUserHeaders(request.headers as Headers), steps, url: workflowUrl, diff --git a/src/test-utils.ts b/src/test-utils.ts index 03d35a5..4cd1343 100644 --- a/src/test-utils.ts +++ b/src/test-utils.ts @@ -76,7 +76,7 @@ export const mockQStashServer = async ({ } } catch (error) { if (error instanceof Error) { - console.error("Assertion error:", error.message); + console.error(error); return new Response(`assertion in mock QStash failed.`, { status: 400, }); diff --git a/src/workflow-parser.test.ts b/src/workflow-parser.test.ts index 1bb733f..0ffda1f 100644 --- a/src/workflow-parser.test.ts +++ b/src/workflow-parser.test.ts @@ -10,10 +10,16 @@ import { } from "./constants"; import { nanoid } from "./utils"; import type { RawStep, Step, WaitStepResponse, WorkflowServeOptions } from "./types"; -import { getRequest, WORKFLOW_ENDPOINT } from "./test-utils"; +import { + getRequest, + MOCK_QSTASH_SERVER_URL, + mockQStashServer, + WORKFLOW_ENDPOINT, +} from "./test-utils"; import { formatWorkflowError, QStashWorkflowError } from "./error"; import { Client } from "@upstash/qstash"; import { processOptions } from "./serve/options"; +import { FinishState } from "./integration.test"; describe("Workflow Parser", () => { describe("validateRequest", () => { @@ -88,6 +94,10 @@ describe("Workflow Parser", () => { }); describe("parseRequest", () => { + const token = nanoid(); + const workflowRunId = nanoid(); + const qstashClient = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }); + test("should handle first invocation", async () => { const payload = { initial: "payload" }; const rawPayload = JSON.stringify(payload); @@ -95,28 +105,76 @@ describe("Workflow Parser", () => { body: rawPayload, }); + const finised = new FinishState(); const requestPayload = (await getPayload(request)) ?? ""; - const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( - requestPayload, - true - ); - - // payload isn't parsed - expect(typeof rawInitialPayload).toBe("string"); - expect(rawInitialPayload).toBe(rawPayload); - // steps are empty: - expect(steps).toEqual([]); - expect(isLastDuplicate).toBeFalse(); + await mockQStashServer({ + execute: async () => { + const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( + requestPayload, + true, + workflowRunId, + qstashClient.http + ); + + // payload isn't parsed + expect(typeof rawInitialPayload).toBe("string"); + expect(rawInitialPayload).toBe(rawPayload); + // steps are empty: + expect(steps).toEqual([]); + expect(isLastDuplicate).toBeFalse(); + finised.finish(); + }, + // shouldn't call get steps + receivesRequest: false, + responseFields: { + body: {}, + status: 200, + }, + }); + finised.check(); }); - test("should throw when not first invocation and body is missing", async () => { + test("should fetch steps when not first invocation and body is missing", async () => { + const payload = "my-payload"; const request = new Request(WORKFLOW_ENDPOINT); const requestPayload = (await getPayload(request)) ?? ""; - const throws = parseRequest(requestPayload, false); - expect(throws).rejects.toThrow( - new QStashWorkflowError("Only first call can have an empty body") - ); + const finised = new FinishState(); + + const responseBody: RawStep[] = [ + { + messageId: "msg-id", + body: btoa(JSON.stringify(payload)), + callType: "step", + }, + ]; + await mockQStashServer({ + execute: async () => { + const result = await parseRequest( + requestPayload, + false, + workflowRunId, + qstashClient.http + ); + expect(result.rawInitialPayload).toBe(JSON.stringify(payload)); + expect(result.steps.length).toBe(1); + expect(result.steps[0].out).toBe(JSON.stringify(payload)); + finised.finish(); + }, + // should call get steps + receivesRequest: { + headers: {}, + method: "GET", + token, + url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs/${workflowRunId}`, + }, + responseFields: { + body: responseBody, + status: 200, + }, + }); + + finised.check(); }); test("should return steps and initial payload correctly", async () => { @@ -143,7 +201,9 @@ describe("Workflow Parser", () => { const requestPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestPayload, - false + false, + workflowRunId, + qstashClient.http ); // payload is not parsed @@ -216,7 +276,9 @@ describe("Workflow Parser", () => { const requestPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(reqiestInitialPayload); @@ -270,7 +332,9 @@ describe("Workflow Parser", () => { const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( JSON.stringify(payload), - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe("initial"); @@ -294,6 +358,10 @@ describe("Workflow Parser", () => { }); describe("parseRequest with duplicates", () => { + const token = nanoid(); + const workflowRunId = nanoid(); + const qstashClient = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }); + const requestPayload = "myPayload"; const initStep: Step = { stepId: 0, @@ -302,7 +370,6 @@ describe("Workflow Parser", () => { out: requestPayload, concurrent: 1, }; - const workflowId = "wfr-foo"; test("should ignore extra init steps", async () => { // prettier-ignore @@ -311,12 +378,14 @@ describe("Workflow Parser", () => { {stepId: 1, stepName: "retrySleep", stepType: "SleepFor", sleepFor: 1_000_000, concurrent: 1}, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -339,12 +408,14 @@ describe("Workflow Parser", () => { {stepId: 0, stepName: "successStep2", stepType: "Run", concurrent: 2, targetStep: 5}, // duplicate ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -372,12 +443,14 @@ describe("Workflow Parser", () => { {stepId: 0, stepName: "successStep2", stepType: "Run", concurrent: 2, targetStep: 5}, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -407,12 +480,14 @@ describe("Workflow Parser", () => { {stepId: 5, stepName: "successStep2", stepType: "Run", out: "20", concurrent: 2}, // duplicate ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -443,12 +518,14 @@ describe("Workflow Parser", () => { {stepId: 5, stepName: "successStep2", stepType: "Run", out: '"20"', concurrent: 2}, // duplicate ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -475,12 +552,14 @@ describe("Workflow Parser", () => { {stepId: 2, stepName: "retrySleep", stepType: "SleepFor", sleepFor: 1_000_000, concurrent: 1}, // duplicate ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -502,12 +581,14 @@ describe("Workflow Parser", () => { {stepId: 2, stepName: "retrySleep", stepType: "SleepFor", sleepFor: 1_000_000, concurrent: 1}, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -541,12 +622,14 @@ describe("Workflow Parser", () => { {stepId: 5, stepName: "successStep2", stepType: "Run", out: '"20"', concurrent: 2}, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -584,12 +667,14 @@ describe("Workflow Parser", () => { {stepId: 5, stepName: "successStep2", stepType: "Run", out: '"20"', concurrent: 2}, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts index 4ca4536..c476ed6 100644 --- a/src/workflow-parser.ts +++ b/src/workflow-parser.ts @@ -20,6 +20,8 @@ import type { WorkflowLogger } from "./logger"; import { WorkflowContext } from "./context"; import { recreateUserHeaders } from "./workflow-requests"; import { decodeBase64, nanoid } from "./utils"; +import { getSteps } from "./client/utils"; +import { Client } from "@upstash/qstash"; /** * Gets the request body. If that fails, returns undefined @@ -48,8 +50,9 @@ export const getPayload = async (request: Request) => { * @param rawPayload body of the request as a string as explained above * @returns intiial payload and list of steps */ -const parsePayload = async (rawPayload: string, debug?: WorkflowLogger) => { - const [encodedInitialPayload, ...encodedSteps] = JSON.parse(rawPayload) as RawStep[]; +const parsePayload = async (payload: string | RawStep[], debug?: WorkflowLogger) => { + const [encodedInitialPayload, ...encodedSteps] = + typeof payload === "string" ? (JSON.parse(payload) as RawStep[]) : payload; // decode initial payload: const rawInitialPayload = decodeBase64(encodedInitialPayload.body); @@ -220,6 +223,8 @@ export const validateRequest = ( export const parseRequest = async ( requestPayload: string | undefined, isFirstInvocation: boolean, + workflowRunId: string, + requester: Client["http"], debug?: WorkflowLogger ): Promise<{ rawInitialPayload: string; @@ -234,11 +239,17 @@ export const parseRequest = async ( isLastDuplicate: false, }; } else { - // if not the first invocation, make sure that body is not empty and parse payload if (!requestPayload) { - throw new QStashWorkflowError("Only first call can have an empty body"); + await debug?.log( + "INFO", + "ENDPOINT_START", + "request payload is empty, steps will be fetched from QStash." + ); } - const { rawInitialPayload, steps } = await parsePayload(requestPayload, debug); + const { rawInitialPayload, steps } = await parsePayload( + requestPayload ? requestPayload : await getSteps(requester, workflowRunId), + debug + ); const isLastDuplicate = await checkIfLastOneIsDuplicate(steps, debug); const deduplicatedSteps = deduplicateSteps(steps); @@ -306,14 +317,19 @@ export const handleFailure = async ( steps, // eslint-disable-next-line @typescript-eslint/no-unused-vars isLastDuplicate: _isLastDuplicate, - } = await parseRequest(decodeBase64(sourceBody), false, debug); + } = await parseRequest( + decodeBase64(sourceBody), + false, + workflowRunId, + qstashClient.http, + debug + ); // create context const workflowContext = new WorkflowContext({ qstashClient, workflowRunId, initialPayload: initialPayloadParser(rawInitialPayload), - rawInitialPayload, headers: recreateUserHeaders(new Headers(sourceHeader) as Headers), steps, url: url, From 9b94610b43dd727b44df23ff869cc13e083812f5 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 24 Oct 2024 13:18:13 +0300 Subject: [PATCH 3/8] fix: context.cancel run state would become run_success. now it becomes run_canceled --- src/client/index.ts | 9 ++------- src/client/utils.ts | 9 +++++++++ src/serve/index.ts | 4 ++++ src/serve/serve.test.ts | 2 +- src/workflow-requests.test.ts | 17 ++++++++++++++++- src/workflow-requests.ts | 4 +++- 6 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index 82dc42d..8304503 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -1,6 +1,6 @@ import { NotifyResponse, Waiter } from "../types"; import { Client as QStashClient } from "@upstash/qstash"; -import { makeGetWaitersRequest, makeNotifyRequest } from "./utils"; +import { makeCancelRequest, makeGetWaitersRequest, makeNotifyRequest } from "./utils"; type ClientConfig = ConstructorParameters[0]; @@ -37,12 +37,7 @@ export class Client { * @returns true if workflow is succesfully deleted. Otherwise throws QStashError */ public async cancel({ workflowRunId }: { workflowRunId: string }) { - const result = (await this.client.http.request({ - path: ["v2", "workflows", "runs", `${workflowRunId}?cancel=true`], - method: "DELETE", - parseResponseAsJson: false, - })) as { error: string } | undefined; - return result ?? true; + return await makeCancelRequest(this.client.http, workflowRunId); } /** diff --git a/src/client/utils.ts b/src/client/utils.ts index 7b15559..1533661 100644 --- a/src/client/utils.ts +++ b/src/client/utils.ts @@ -27,6 +27,15 @@ export const makeGetWaitersRequest = async ( return result; }; +export const makeCancelRequest = async (requester: Client["http"], workflowRunId: string) => { + const result = (await requester.request({ + path: ["v2", "workflows", "runs", `${workflowRunId}?cancel=true`], + method: "DELETE", + parseResponseAsJson: false, + })) as { error: string } | undefined; + return result ?? true; +}; + export const getSteps = async ( requester: Client["http"], workflowRunId: string, diff --git a/src/serve/index.ts b/src/serve/index.ts index 0cd48f0..3439fda 100644 --- a/src/serve/index.ts +++ b/src/serve/index.ts @@ -1,3 +1,4 @@ +import { makeCancelRequest } from "../client/utils"; import { WorkflowContext } from "../context"; import { formatWorkflowError } from "../error"; import { WorkflowLogger } from "../logger"; @@ -160,6 +161,9 @@ export const serve = < onCleanup: async () => { await triggerWorkflowDelete(workflowContext, debug); }, + onCancel: async () => { + await makeCancelRequest(workflowContext.qstashClient.http, workflowRunId); + }, }); if (result.isErr()) { diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index f3cdf0f..1cfee0e 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -618,7 +618,7 @@ describe("serve", () => { responseFields: { body: undefined, status: 200 }, receivesRequest: { method: "DELETE", - url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs/wfr-foo?cancel=false`, + url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs/wfr-foo?cancel=true`, token, }, }); diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index 0adfb9a..e7325de 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -76,6 +76,9 @@ describe("Workflow Requests", () => { onCleanup: async () => { await Promise.resolve(); }, + onCancel: () => { + throw new Error("Something went wrong!"); + }, }); expect(result.isOk()).toBeTrue(); // @ts-expect-error value will be set since stepFinish isOk @@ -90,6 +93,9 @@ describe("Workflow Requests", () => { onCleanup: async () => { await Promise.resolve(); }, + onCancel: () => { + throw new Error("Something went wrong!"); + }, }); expect(result.isOk()).toBeTrue(); // @ts-expect-error value will be set since stepFinish isOk @@ -104,6 +110,9 @@ describe("Workflow Requests", () => { onCleanup: async () => { await Promise.resolve(); }, + onCancel: () => { + throw new Error("Something went wrong!"); + }, }); expect(result.isErr()).toBeTrue(); }); @@ -116,12 +125,15 @@ describe("Workflow Requests", () => { onCleanup: () => { throw new Error("Something went wrong!"); }, + onCancel: () => { + throw new Error("Something went wrong!"); + }, }); expect(result.isErr()).toBeTrue(); }); }); - test("should call cleanup if context.cancel is called", async () => { + test("should call onCancel if context.cancel is called", async () => { const workflowRunId = nanoid(); const token = "myToken"; @@ -143,6 +155,9 @@ describe("Workflow Requests", () => { }); }, onCleanup: async () => { + throw new Error("shouldn't call"); + }, + onCancel: async () => { finished.finish(); }, }); diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index 08d5cd1..aa4d8e3 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -59,9 +59,11 @@ export const triggerFirstInvocation = async ( export const triggerRouteFunction = async ({ onCleanup, onStep, + onCancel, }: { onStep: () => Promise; onCleanup: () => Promise; + onCancel: () => Promise; }): Promise | Err> => { try { // When onStep completes successfully, it throws an exception named `QStashWorkflowAbort`, @@ -75,7 +77,7 @@ export const triggerRouteFunction = async ({ if (!(error_ instanceof QStashWorkflowAbort)) { return err(error_); } else if (error_.cancelWorkflow) { - await onCleanup(); + await onCancel(); return ok("workflow-finished"); } else { return ok("step-finished"); From 0ef8478f388eb7dd791ee32c3eb60982337709ab Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 24 Oct 2024 16:42:48 +0300 Subject: [PATCH 4/8] feat: rename errors --- src/context/auto-executor.test.ts | 28 ++++++++++++------------ src/context/auto-executor.ts | 36 +++++++++++++++---------------- src/context/context.test.ts | 16 +++++++------- src/context/context.ts | 6 +++--- src/error.ts | 8 +++---- src/index.ts | 2 +- src/serve/authorization.test.ts | 16 +++++++------- src/serve/authorization.ts | 10 ++++----- src/workflow-parser.test.ts | 12 +++++------ src/workflow-parser.ts | 12 +++++------ src/workflow-requests.test.ts | 6 +++--- src/workflow-requests.ts | 12 +++++------ 12 files changed, 81 insertions(+), 83 deletions(-) diff --git a/src/context/auto-executor.test.ts b/src/context/auto-executor.test.ts index 5e33706..c2680e4 100644 --- a/src/context/auto-executor.test.ts +++ b/src/context/auto-executor.test.ts @@ -6,7 +6,7 @@ import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../ import { nanoid } from "../utils"; import { AutoExecutor } from "./auto-executor"; import type { Step } from "../types"; -import { QStashWorkflowAbort, QStashWorkflowError } from "../error"; +import { WorkflowAbort, WorkflowError } from "../error"; class SpyAutoExecutor extends AutoExecutor { public declare getParallelCallState; @@ -106,7 +106,7 @@ describe("auto-executor", () => { const throws = context.run("attemptCharge", () => { return { input: context.requestPayload, success: false }; }); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -195,7 +195,7 @@ describe("auto-executor", () => { context.sleep("sleep for some time", 123), context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -262,7 +262,7 @@ describe("auto-executor", () => { context.sleep("sleep for some time", 123), context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -314,7 +314,7 @@ describe("auto-executor", () => { context.sleep("sleep for some time", 123), context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -366,7 +366,7 @@ describe("auto-executor", () => { context.sleep("sleep for some time", 123), context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -434,7 +434,7 @@ describe("auto-executor", () => { return true; }); expect(throws).rejects.toThrow( - new QStashWorkflowError( + new WorkflowError( "Incompatible step name. Expected 'wrongName', got 'attemptCharge' from the request" ) ); @@ -443,7 +443,7 @@ describe("auto-executor", () => { const context = getContext([initialStep, singleStep]); const throws = context.sleep("attemptCharge", 10); expect(throws).rejects.toThrow( - new QStashWorkflowError( + new WorkflowError( "Incompatible step type. Expected 'SleepFor', got 'Run' from the request" ) ); @@ -460,7 +460,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep until next day", 123_123), ]); expect(throws).rejects.toThrow( - new QStashWorkflowError( + new WorkflowError( "Incompatible step name. Expected 'wrongName', got 'sleep for some time' from the request" ) ); @@ -474,7 +474,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep until next day", 123_123), ]); expect(throws).rejects.toThrow( - new QStashWorkflowError( + new WorkflowError( "Incompatible step type. Expected 'SleepUntil', got 'SleepFor' from the request" ) ); @@ -490,7 +490,7 @@ describe("auto-executor", () => { context.sleep("wrongName", 10), // wrong step name context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }); test("step type", () => { const context = getContext([initialStep, ...parallelSteps.slice(0, 3)]); @@ -500,7 +500,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep for some time", 10), // wrong step type context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }); }); @@ -514,7 +514,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep until next day", 123_123), ]); expect(throws).rejects.toThrowError( - new QStashWorkflowError( + new WorkflowError( "Incompatible steps detected in parallel execution: Incompatible step name. Expected 'wrongName', got 'sleep for some time' from the request\n" + ' > Step Names from the request: ["sleep for some time","sleep until next day"]\n' + ' Step Types from the request: ["SleepFor","SleepUntil"]\n' + @@ -532,7 +532,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep until next day", 123_123), ]); expect(throws).rejects.toThrowError( - new QStashWorkflowError( + new WorkflowError( "Incompatible steps detected in parallel execution: Incompatible step type. Expected 'SleepUntil', got 'SleepFor' from the request\n" + ' > Step Names from the request: ["sleep for some time","sleep until next day"]\n' + ' Step Types from the request: ["SleepFor","SleepUntil"]\n' + diff --git a/src/context/auto-executor.ts b/src/context/auto-executor.ts index 930fa52..1659de4 100644 --- a/src/context/auto-executor.ts +++ b/src/context/auto-executor.ts @@ -1,4 +1,4 @@ -import { QStashWorkflowAbort, QStashWorkflowError } from "../error"; +import { WorkflowAbort, WorkflowError } from "../error"; import type { WorkflowContext } from "./context"; import type { StepFunction, ParallelCallState, Step, WaitRequest } from "../types"; import { type BaseLazyStep } from "./steps"; @@ -39,14 +39,14 @@ export class AutoExecutor { * * If a function is already executing (this.executingStep), this * means that there is a nested step which is not allowed. In this - * case, addStep throws QStashWorkflowError. + * case, addStep throws WorkflowError. * * @param stepInfo step plan to add * @returns result of the step function */ public async addStep(stepInfo: BaseLazyStep) { if (this.executingStep) { - throw new QStashWorkflowError( + throw new WorkflowError( "A step can not be run inside another step." + ` Tried to run '${stepInfo.stepName}' inside '${this.executingStep}'` ); @@ -159,7 +159,7 @@ export class AutoExecutor { if (parallelCallState !== "first" && plannedParallelStepCount !== parallelSteps.length) { // user has added/removed a parallel step - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible number of parallel steps when call state was '${parallelCallState}'.` + ` Expected ${parallelSteps.length}, got ${plannedParallelStepCount} from the request.` ); @@ -194,7 +194,7 @@ export class AutoExecutor { */ const planStep = this.steps.at(-1); if (!planStep || planStep.targetStep === undefined) { - throw new QStashWorkflowError( + throw new WorkflowError( `There must be a last step and it should have targetStep larger than 0.` + `Received: ${JSON.stringify(planStep)}` ); @@ -217,10 +217,10 @@ export class AutoExecutor { ); await this.submitStepsToQStash([resultStep]); } catch (error) { - if (error instanceof QStashWorkflowAbort) { + if (error instanceof WorkflowAbort) { throw error; } - throw new QStashWorkflowError( + throw new WorkflowError( `Error submitting steps to QStash in partial parallel step execution: ${error}` ); } @@ -235,7 +235,7 @@ export class AutoExecutor { * This call to the API should be discarded: no operations are to be made. Parallel steps which are still * running will finish and call QStash eventually. */ - throw new QStashWorkflowAbort("discarded parallel"); + throw new WorkflowAbort("discarded parallel"); } case "last": { /** @@ -312,7 +312,7 @@ export class AutoExecutor { private async submitStepsToQStash(steps: Step[]) { // if there are no steps, something went wrong. Raise exception if (steps.length === 0) { - throw new QStashWorkflowError( + throw new WorkflowError( `Unable to submit steps to QStash. Provided list is empty. Current step: ${this.stepCount}` ); } @@ -359,7 +359,7 @@ export class AutoExecutor { parseResponseAsJson: false, }); - throw new QStashWorkflowAbort(steps[0].stepName, steps[0]); + throw new WorkflowAbort(steps[0].stepName, steps[0]); } const result = await this.context.qstashClient.batchJSON( @@ -415,7 +415,7 @@ export class AutoExecutor { }); // if the steps are sent successfully, abort to stop the current request - throw new QStashWorkflowAbort(steps[0].stepName, steps[0]); + throw new WorkflowAbort(steps[0].stepName, steps[0]); } /** @@ -448,7 +448,7 @@ export class AutoExecutor { ) { return result[index] as TResult; } else { - throw new QStashWorkflowError( + throw new WorkflowError( `Unexpected parallel call result while executing step ${index}: '${result}'. Expected ${lazyStepList.length} many items` ); } @@ -465,7 +465,7 @@ export class AutoExecutor { * from the incoming request; compare the step names and types to make sure * that they are the same. * - * Raises `QStashWorkflowError` if there is a difference. + * Raises `WorkflowError` if there is a difference. * * @param lazyStep lazy step created during execution * @param stepFromRequest step parsed from incoming request @@ -473,14 +473,14 @@ export class AutoExecutor { const validateStep = (lazyStep: BaseLazyStep, stepFromRequest: Step): void => { // check step name if (lazyStep.stepName !== stepFromRequest.stepName) { - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible step name. Expected '${lazyStep.stepName}',` + ` got '${stepFromRequest.stepName}' from the request` ); } // check type name if (lazyStep.stepType !== stepFromRequest.stepType) { - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible step type. Expected '${lazyStep.stepType}',` + ` got '${stepFromRequest.stepType}' from the request` ); @@ -491,7 +491,7 @@ const validateStep = (lazyStep: BaseLazyStep, stepFromRequest: Step): void => { * validates that each lazy step and step from request has the same step * name and type using `validateStep` method. * - * If there is a difference, raises `QStashWorkflowError` with information + * If there is a difference, raises `WorkflowError` with information * about the difference. * * @param lazySteps list of lazy steps created during parallel execution @@ -503,12 +503,12 @@ const validateParallelSteps = (lazySteps: BaseLazyStep[], stepsFromRequest: Step validateStep(lazySteps[index], stepFromRequest); } } catch (error) { - if (error instanceof QStashWorkflowError) { + if (error instanceof WorkflowError) { const lazyStepNames = lazySteps.map((lazyStep) => lazyStep.stepName); const lazyStepTypes = lazySteps.map((lazyStep) => lazyStep.stepType); const requestStepNames = stepsFromRequest.map((step) => step.stepName); const requestStepTypes = stepsFromRequest.map((step) => step.stepType); - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible steps detected in parallel execution: ${error.message}` + `\n > Step Names from the request: ${JSON.stringify(requestStepNames)}` + `\n Step Types from the request: ${JSON.stringify(requestStepTypes)}` + diff --git a/src/context/context.test.ts b/src/context/context.test.ts index 8496129..560b8ae 100644 --- a/src/context/context.test.ts +++ b/src/context/context.test.ts @@ -4,7 +4,7 @@ import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../ import { WorkflowContext } from "./context"; import { Client } from "@upstash/qstash"; import { nanoid } from "../utils"; -import { QStashWorkflowAbort, QStashWorkflowError } from "../error"; +import { WorkflowAbort, WorkflowError } from "../error"; import { WORKFLOW_ID_HEADER, WORKFLOW_INIT_HEADER, @@ -33,7 +33,7 @@ describe("context tests", () => { }); }; expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( "A step can not be run inside another step. Tried to run 'inner step' inside 'outer step'" ) ); @@ -55,7 +55,7 @@ describe("context tests", () => { }); }; expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( "A step can not be run inside another step. Tried to run 'inner sleep' inside 'outer step'" ) ); @@ -77,7 +77,7 @@ describe("context tests", () => { }); }; expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( "A step can not be run inside another step. Tried to run 'inner sleepUntil' inside 'outer step'" ) ); @@ -99,7 +99,7 @@ describe("context tests", () => { }); }; expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( "A step can not be run inside another step. Tried to run 'inner call' inside 'outer step'" ) ); @@ -275,10 +275,10 @@ describe("context tests", () => { try { await context.cancel(); } catch (error) { - expect(error instanceof QStashWorkflowAbort).toBeTrue(); - const _error = error as QStashWorkflowAbort; + expect(error instanceof WorkflowAbort).toBeTrue(); + const _error = error as WorkflowAbort; expect(_error.stepName).toBe("cancel"); - expect(_error.name).toBe("QStashWorkflowAbort"); + expect(_error.name).toBe("WorkflowAbort"); expect(_error.cancelWorkflow).toBeTrue(); return; } diff --git a/src/context/context.ts b/src/context/context.ts index acc8208..930b125 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -13,7 +13,7 @@ import { import type { HTTPMethods } from "@upstash/qstash"; import type { WorkflowLogger } from "../logger"; import { DEFAULT_RETRIES } from "../constants"; -import { QStashWorkflowAbort } from "../error"; +import { WorkflowAbort } from "../error"; /** * Upstash Workflow context @@ -423,12 +423,12 @@ export class WorkflowContext { /** * Cancel the current workflow run * - * Will throw QStashWorkflowAbort to stop workflow execution. + * Will throw WorkflowAbort to stop workflow execution. * Shouldn't be inside try/catch. */ public async cancel() { // throw an abort which will make the workflow cancel - throw new QStashWorkflowAbort("cancel", undefined, true); + throw new WorkflowAbort("cancel", undefined, true); } /** diff --git a/src/error.ts b/src/error.ts index d4293ea..dca6e7c 100644 --- a/src/error.ts +++ b/src/error.ts @@ -4,10 +4,10 @@ import type { FailureFunctionPayload, Step } from "./types"; /** * Error raised during Workflow execution */ -export class QStashWorkflowError extends QstashError { +export class WorkflowError extends QstashError { constructor(message: string) { super(message); - this.name = "QStashWorkflowError"; + this.name = "WorkflowError"; } } @@ -15,7 +15,7 @@ export class QStashWorkflowError extends QstashError { * Raised when the workflow executes a function successfully * and aborts to end the execution */ -export class QStashWorkflowAbort extends Error { +export class WorkflowAbort extends Error { public stepInfo?: Step; public stepName: string; /** @@ -35,7 +35,7 @@ export class QStashWorkflowAbort extends Error { " Make sure that you await for each step. Also, if you are using try/catch blocks, you should not wrap context.run/sleep/sleepUntil/call methods with try/catch." + ` Aborting workflow after executing step '${stepName}'.` ); - this.name = "QStashWorkflowAbort"; + this.name = "WorkflowAbort"; this.stepName = stepName; this.stepInfo = stepInfo; this.cancelWorkflow = cancelWorkflow; diff --git a/src/index.ts b/src/index.ts index f0fd609..7c868e2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,4 +3,4 @@ export * from "./context"; export * from "./types"; export * from "./logger"; export * from "./client"; -export { QStashWorkflowError, QStashWorkflowAbort } from "./error"; +export { WorkflowError, WorkflowAbort } from "./error"; diff --git a/src/serve/authorization.test.ts b/src/serve/authorization.test.ts index de07939..54109fc 100644 --- a/src/serve/authorization.test.ts +++ b/src/serve/authorization.test.ts @@ -4,7 +4,7 @@ import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../ import { WorkflowContext } from "../context"; import { Client } from "@upstash/qstash"; import { nanoid } from "../utils"; -import { QStashWorkflowAbort } from "../error"; +import { WorkflowAbort } from "../error"; import type { RouteFunction } from "../types"; import { DisabledWorkflowContext } from "./authorization"; @@ -27,7 +27,7 @@ describe("disabled workflow context", () => { const throws = disabledContext.run("run-step", () => { return 1; }); - expect(throws).rejects.toThrow(QStashWorkflowAbort); + expect(throws).rejects.toThrow(WorkflowAbort); called = true; }, responseFields: { @@ -43,7 +43,7 @@ describe("disabled workflow context", () => { await mockQStashServer({ execute: () => { const throws = disabledContext.sleep("sleep-step", 1); - expect(throws).rejects.toThrow(QStashWorkflowAbort); + expect(throws).rejects.toThrow(WorkflowAbort); called = true; }, responseFields: { @@ -59,7 +59,7 @@ describe("disabled workflow context", () => { await mockQStashServer({ execute: () => { const throws = disabledContext.sleepUntil("sleepUntil-step", 1); - expect(throws).rejects.toThrow(QStashWorkflowAbort); + expect(throws).rejects.toThrow(WorkflowAbort); called = true; }, responseFields: { @@ -75,7 +75,7 @@ describe("disabled workflow context", () => { await mockQStashServer({ execute: () => { const throws = disabledContext.call("call-step", { url: "some-url" }); - expect(throws).rejects.toThrow(QStashWorkflowAbort); + expect(throws).rejects.toThrow(WorkflowAbort); called = true; }, responseFields: { @@ -189,7 +189,7 @@ describe("disabled workflow context", () => { const throws = context.run("step", async () => { return await Promise.resolve("result"); }); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); called = true; }, responseFields: { @@ -242,7 +242,7 @@ describe("disabled workflow context", () => { const throws = context.run("step", () => { return Promise.resolve("result"); }); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); called = true; }, responseFields: { @@ -295,7 +295,7 @@ describe("disabled workflow context", () => { const throws = context.run("step", () => { return "result"; }); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); called = true; called = true; }, diff --git a/src/serve/authorization.ts b/src/serve/authorization.ts index c59f26e..d87ffaf 100644 --- a/src/serve/authorization.ts +++ b/src/serve/authorization.ts @@ -1,13 +1,13 @@ import type { Err, Ok } from "neverthrow"; import { err, ok } from "neverthrow"; -import { QStashWorkflowAbort } from "../error"; +import { WorkflowAbort } from "../error"; import { RouteFunction } from "../types"; import { WorkflowContext } from "../context"; import { BaseLazyStep } from "../context/steps"; import { Client } from "@upstash/qstash"; /** - * Workflow context which throws QStashWorkflowAbort before running the steps. + * Workflow context which throws WorkflowAbort before running the steps. * * Used for making a dry run before running any steps to check authentication. * @@ -37,7 +37,7 @@ export class DisabledWorkflowContext< private static readonly disabledMessage = "disabled-qstash-worklfow-run"; /** - * overwrite the WorkflowContext.addStep method to always raise QStashWorkflowAbort + * overwrite the WorkflowContext.addStep method to always raise WorkflowAbort * error in order to stop the execution whenever we encounter a step. * * @param _step @@ -46,7 +46,7 @@ export class DisabledWorkflowContext< // eslint-disable-next-line @typescript-eslint/no-unused-vars _step: BaseLazyStep ): Promise { - throw new QStashWorkflowAbort(DisabledWorkflowContext.disabledMessage); + throw new WorkflowAbort(DisabledWorkflowContext.disabledMessage); } /** @@ -89,7 +89,7 @@ export class DisabledWorkflowContext< try { await routeFunction(disabledContext); } catch (error) { - if (error instanceof QStashWorkflowAbort && error.stepName === this.disabledMessage) { + if (error instanceof WorkflowAbort && error.stepName === this.disabledMessage) { return ok("step-found"); } return err(error as Error); diff --git a/src/workflow-parser.test.ts b/src/workflow-parser.test.ts index 0ffda1f..7015e5f 100644 --- a/src/workflow-parser.test.ts +++ b/src/workflow-parser.test.ts @@ -16,7 +16,7 @@ import { mockQStashServer, WORKFLOW_ENDPOINT, } from "./test-utils"; -import { formatWorkflowError, QStashWorkflowError } from "./error"; +import { formatWorkflowError, WorkflowError } from "./error"; import { Client } from "@upstash/qstash"; import { processOptions } from "./serve/options"; import { FinishState } from "./integration.test"; @@ -58,7 +58,7 @@ describe("Workflow Parser", () => { }); const throws = () => validateRequest(request); - expect(throws).toThrow(new QStashWorkflowError("Couldn't get workflow id from header")); + expect(throws).toThrow(new WorkflowError("Couldn't get workflow id from header")); }); test("should throw when protocol version is incompatible", () => { @@ -71,7 +71,7 @@ describe("Workflow Parser", () => { const throws = () => validateRequest(request); expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( `Incompatible workflow sdk protocol version.` + ` Expected ${WORKFLOW_PROTOCOL_VERSION}, got ${requestProtocol} from the request.` ) @@ -707,7 +707,7 @@ describe("Workflow Parser", () => { const body = { status: 201, header: { myHeader: "value" }, - body: btoa(JSON.stringify(formatWorkflowError(new QStashWorkflowError(failMessage)))), + body: btoa(JSON.stringify(formatWorkflowError(new WorkflowError(failMessage)))), url: WORKFLOW_ENDPOINT, sourceHeader: { Authorization: authorization, @@ -749,7 +749,7 @@ describe("Workflow Parser", () => { expect(result2.isOk() && result2.value === "not-failure-callback").toBeTrue(); }); - test("should throw QStashWorkflowError if header is set but function is not passed", async () => { + test("should throw WorkflowError if header is set but function is not passed", async () => { const request = new Request(WORKFLOW_ENDPOINT, { headers: { [WORKFLOW_FAILURE_HEADER]: "true", @@ -758,7 +758,7 @@ describe("Workflow Parser", () => { const result = await handleFailure(request, "", client, initialPayloadParser); expect(result.isErr()).toBeTrue(); - expect(result.isErr() && result.error.name).toBe(QStashWorkflowError.name); + expect(result.isErr() && result.error.name).toBe(WorkflowError.name); expect(result.isErr() && result.error.message).toBe( "Workflow endpoint is called to handle a failure," + " but a failureFunction is not provided in serve options." + diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts index c476ed6..0bbcd8c 100644 --- a/src/workflow-parser.ts +++ b/src/workflow-parser.ts @@ -1,6 +1,6 @@ import type { Err, Ok } from "neverthrow"; import { err, ok } from "neverthrow"; -import { QStashWorkflowError } from "./error"; +import { WorkflowError } from "./error"; import { NO_CONCURRENCY, WORKFLOW_FAILURE_HEADER, @@ -175,7 +175,7 @@ const checkIfLastOneIsDuplicate = async ( * Validates the incoming request checking the workflow protocol * version and whether it is the first invocation. * - * Raises `QStashWorkflowError` if: + * Raises `WorkflowError` if: * - it's not the first invocation and expected protocol version doesn't match * the request. * - it's not the first invocation but there is no workflow id in the headers. @@ -191,7 +191,7 @@ export const validateRequest = ( // if it's not the first invocation, verify that the workflow protocal version is correct if (!isFirstInvocation && versionHeader !== WORKFLOW_PROTOCOL_VERSION) { - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible workflow sdk protocol version. Expected ${WORKFLOW_PROTOCOL_VERSION},` + ` got ${versionHeader} from the request.` ); @@ -202,7 +202,7 @@ export const validateRequest = ( ? `wfr_${nanoid()}` : (request.headers.get(WORKFLOW_ID_HEADER) ?? ""); if (workflowRunId.length === 0) { - throw new QStashWorkflowError("Couldn't get workflow id from header"); + throw new WorkflowError("Couldn't get workflow id from header"); } return { @@ -266,7 +266,7 @@ export const parseRequest = async ( * attempts to call the failureFunction function. * * If the header is set but failureFunction is not passed, returns - * QStashWorkflowError. + * WorkflowError. * * @param request incoming request * @param failureFunction function to handle the failure @@ -287,7 +287,7 @@ export const handleFailure = async ( if (!failureFunction) { return err( - new QStashWorkflowError( + new WorkflowError( "Workflow endpoint is called to handle a failure," + " but a failureFunction is not provided in serve options." + " Either provide a failureUrl or a failureFunction." diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index e7325de..f2f940e 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -10,7 +10,7 @@ import { triggerRouteFunction, triggerWorkflowDelete, } from "./workflow-requests"; -import { QStashWorkflowAbort } from "./error"; +import { WorkflowAbort } from "./error"; import { WorkflowContext } from "./context"; import { Client } from "@upstash/qstash"; import type { Step, StepType } from "./types"; @@ -68,10 +68,10 @@ describe("Workflow Requests", () => { }); describe("triggerRouteFunction", () => { - test("should get step-finished when QStashWorkflowAbort is thrown", async () => { + test("should get step-finished when WorkflowAbort is thrown", async () => { const result = await triggerRouteFunction({ onStep: () => { - throw new QStashWorkflowAbort("name"); + throw new WorkflowAbort("name"); }, onCleanup: async () => { await Promise.resolve(); diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index aa4d8e3..c49ec57 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -1,6 +1,6 @@ import type { Err, Ok } from "neverthrow"; import { err, ok } from "neverthrow"; -import { QStashWorkflowAbort, QStashWorkflowError } from "./error"; +import { WorkflowAbort, WorkflowError } from "./error"; import type { WorkflowContext } from "./context"; import { DEFAULT_CONTENT_TYPE, @@ -66,7 +66,7 @@ export const triggerRouteFunction = async ({ onCancel: () => Promise; }): Promise | Err> => { try { - // When onStep completes successfully, it throws an exception named `QStashWorkflowAbort`, + // When onStep completes successfully, it throws an exception named `WorkflowAbort`, // indicating that the step has been successfully executed. // This ensures that onCleanup is only called when no exception is thrown. await onStep(); @@ -74,7 +74,7 @@ export const triggerRouteFunction = async ({ return ok("workflow-finished"); } catch (error) { const error_ = error as Error; - if (!(error_ instanceof QStashWorkflowAbort)) { + if (!(error_ instanceof WorkflowAbort)) { return err(error_); } else if (error_.cancelWorkflow) { await onCancel(); @@ -262,9 +262,7 @@ export const handleThirdPartyCallResult = async ( } catch (error) { const isCallReturn = request.headers.get("Upstash-Workflow-Callback"); return err( - new QStashWorkflowError( - `Error when handling call return (isCallReturn=${isCallReturn}): ${error}` - ) + new WorkflowError(`Error when handling call return (isCallReturn=${isCallReturn}): ${error}`) ); } }; @@ -414,7 +412,7 @@ export const verifyRequest = async ( throw new Error("Signature in `Upstash-Signature` header is not valid"); } } catch (error) { - throw new QStashWorkflowError( + throw new WorkflowError( `Failed to verify that the Workflow request comes from QStash: ${error}\n\n` + "If signature is missing, trigger the workflow endpoint by publishing your request to QStash instead of calling it directly.\n\n" + "If you want to disable QStash Verification, you should clear env variables QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY" From 30e5315487a13daf65798e1ce0513a8d86527e30 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 31 Oct 2024 18:14:02 +0300 Subject: [PATCH 5/8] fix: add lazyFetch for initial payload and context.call --- src/context/auto-executor.test.ts | 5 + src/context/context.test.ts | 4 + src/integration.test.ts | 148 ++++++++++++++++++++++++++++++ src/serve/authorization.test.ts | 3 + src/serve/serve.test.ts | 8 ++ src/workflow-requests.test.ts | 9 +- src/workflow-requests.ts | 31 ++++++- 7 files changed, 206 insertions(+), 2 deletions(-) diff --git a/src/context/auto-executor.test.ts b/src/context/auto-executor.test.ts index c2680e4..da34974 100644 --- a/src/context/auto-executor.test.ts +++ b/src/context/auto-executor.test.ts @@ -121,6 +121,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -211,6 +212,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-delay": "123s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", @@ -225,6 +227,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -277,6 +280,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -329,6 +333,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", diff --git a/src/context/context.test.ts b/src/context/context.test.ts index 560b8ae..665721a 100644 --- a/src/context/context.test.ts +++ b/src/context/context.test.ts @@ -138,6 +138,7 @@ describe("context tests", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "2", @@ -186,6 +187,7 @@ describe("context tests", () => { timeout: "20s", timeoutHeaders: { "Content-Type": ["application/json"], + "Upstash-Feature-Set": ["LazyFetch"], [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: ["1"], "Upstash-Retries": ["3"], "Upstash-Workflow-CallType": ["step"], @@ -235,6 +237,7 @@ describe("context tests", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -249,6 +252,7 @@ describe("context tests", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", diff --git a/src/integration.test.ts b/src/integration.test.ts index 780e33e..3a1d395 100644 --- a/src/integration.test.ts +++ b/src/integration.test.ts @@ -782,4 +782,152 @@ describe.skip("live serve tests", () => { timeout: 10_000, } ); + + describe("lazy fetch", () => { + // create 5 mb payload. + // lazy fetch will become enabled for payloads larger than 3mb + const largeObject = "x".repeat(4 * 1024 * 1024); + + test( + "large payload", + async () => { + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 3, + waitFor: 7000, + initialPayload: largeObject, + finishState, + routeFunction: async (context) => { + const input = context.requestPayload; + + expect(input).toBe(largeObject); + + const result = await context.run("step1", async () => { + return "step-1-result"; + }); + expect(result).toBe("step-1-result"); + + finishState.finish(); + }, + }); + }, + { + timeout: 10_000, + } + ); + test.skip( + "large parallel step response", + async () => { + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 4, + waitFor: 7000, + initialPayload: "my-payload", + finishState, + routeFunction: async (context) => { + const input = context.requestPayload; + + expect(input).toBe("my-payload"); + + const results = await Promise.all([ + context.run("step1", () => { + return largeObject; + }), + context.sleep("sleep1", 1), + context.run("step2", () => { + return largeObject; + }), + context.sleep("sleep2", 1), + ]); + + expect(results[0]).toBe(largeObject); + expect(results[1]).toBe(undefined); + expect(results[2]).toBe(largeObject); + expect(results[3]).toBe(undefined); + + await context.sleep("check", 1); + + finishState.finish(); + }, + }); + }, + { + timeout: 10_000, + } + ); + + test.skip( + "large error", + async () => { + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 3, + waitFor: 7000, + initialPayload: "my-payload", + finishState, + retries: 0, + routeFunction: async (context) => { + const input = context.requestPayload; + + expect(input).toBe("my-payload"); + + await context.run("step1", async () => { + throw new Error(largeObject); + }); + }, + failureFunction(context, failStatus, failResponse) { + expect(failResponse).toBe(largeObject); + finishState.finish(); + }, + }); + }, + { + timeout: 10_000, + } + ); + test( + "large call response", + async () => { + const thirdPartyServer = serve({ + async fetch() { + return new Response(largeObject, { status: 200 }); + }, + port: THIRD_PARTY_PORT, + }); + + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 6, + waitFor: 9000, + initialPayload: "my-payload", + finishState, + routeFunction: async (context) => { + // sleeping to avoid checking input before the first step + await context.sleep("sleeping", 1); + + const input = context.requestPayload; + expect(input).toBe("my-payload"); + + const { status, body } = await context.call("call to large object", { + url: LOCAL_THIRD_PARTY_URL, + body: input, + method: "POST", + }); + + expect(status).toBe(200); + expect(body).toBe(largeObject); + + await context.sleep("sleep", 1); + + finishState.finish(); + }, + }); + + thirdPartyServer.stop(); + }, + { + timeout: 10_000, + } + ); + }); }); diff --git a/src/serve/authorization.test.ts b/src/serve/authorization.test.ts index 54109fc..c58b107 100644 --- a/src/serve/authorization.test.ts +++ b/src/serve/authorization.test.ts @@ -212,6 +212,7 @@ describe("disabled workflow context", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "0", @@ -265,6 +266,7 @@ describe("disabled workflow context", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -319,6 +321,7 @@ describe("disabled workflow context", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index 1cfee0e..132a181 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -130,6 +130,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-retries": "3", "upstash-method": "POST", @@ -156,6 +157,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -329,6 +331,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -373,6 +376,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", @@ -414,6 +418,8 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", + "upstash-failure-callback-upstash-workflow-runid": "wfr-bar", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", @@ -463,12 +469,14 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-bar", + "upstash-failure-callback-upstash-workflow-runid": "wfr-bar", "upstash-workflow-url": WORKFLOW_ENDPOINT, "upstash-failure-callback": WORKFLOW_ENDPOINT, "upstash-failure-callback-forward-upstash-workflow-is-failure": "true", diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index f2f940e..b0098cf 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -392,6 +392,7 @@ describe("Workflow Requests", () => { [WORKFLOW_INIT_HEADER]: "true", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch", [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION, }); expect(timeoutHeaders).toBeUndefined(); @@ -418,6 +419,7 @@ describe("Workflow Requests", () => { [WORKFLOW_INIT_HEADER]: "false", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch", [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION, }); expect(timeoutHeaders).toBeUndefined(); @@ -451,10 +453,11 @@ describe("Workflow Requests", () => { } ); expect(headers).toEqual({ - [WORKFLOW_FEATURE_HEADER]: "WF_NoDelete", [WORKFLOW_INIT_HEADER]: "false", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, + [WORKFLOW_FEATURE_HEADER]: "WF_NoDelete", + "Upstash-Callback-Feature-Set": "LazyFetch", "Upstash-Retries": "0", "Upstash-Callback": WORKFLOW_ENDPOINT, "Upstash-Callback-Forward-Upstash-Workflow-Callback": "true", @@ -487,6 +490,8 @@ describe("Workflow Requests", () => { [WORKFLOW_INIT_HEADER]: "true", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, + [`Upstash-Failure-Callback-${WORKFLOW_ID_HEADER}`]: workflowRunId, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch", [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION, [`Upstash-Failure-Callback-Forward-${WORKFLOW_FAILURE_HEADER}`]: "true", "Upstash-Failure-Callback": failureUrl, @@ -513,6 +518,7 @@ describe("Workflow Requests", () => { "Upstash-Workflow-Init": "false", "Upstash-Workflow-RunId": workflowRunId, "Upstash-Workflow-Url": WORKFLOW_ENDPOINT, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch", "Upstash-Forward-Upstash-Workflow-Sdk-Version": "1", "Upstash-Workflow-CallType": "step", }); @@ -520,6 +526,7 @@ describe("Workflow Requests", () => { "Upstash-Workflow-Init": ["false"], "Upstash-Workflow-RunId": [workflowRunId], "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT], + [WORKFLOW_FEATURE_HEADER]: ["LazyFetch"], "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], "Upstash-Workflow-Runid": [workflowRunId], "Upstash-Workflow-CallType": ["step"], diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index c49ec57..e1c9b24 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -22,6 +22,7 @@ import type { } from "./types"; import { StepTypes } from "./types"; import type { WorkflowLogger } from "./logger"; +import { getSteps } from "./client/utils"; export const triggerFirstInvocation = async ( workflowContext: WorkflowContext, @@ -158,7 +159,32 @@ export const handleThirdPartyCallResult = async ( > => { try { if (request.headers.get("Upstash-Workflow-Callback")) { - const callbackMessage = JSON.parse(requestPayload) as { + let callbackPayload: string; + if (requestPayload) { + callbackPayload = requestPayload; + } else { + const workflowRunId = request.headers.get("upstash-workflow-runid"); + const messageId = request.headers.get("upstash-message-id"); + + if (!workflowRunId) + throw new WorkflowError("workflow run id missing in context.call lazy fetch."); + if (!messageId) throw new WorkflowError("message id missing in context.call lazy fetch."); + + const steps = await getSteps(client.http, workflowRunId, debug); + const failingStep = steps.find((step) => step.messageId === messageId); + + if (!failingStep) + throw new WorkflowError( + "Failed to submit the context.call." + + (steps.length === 0 + ? "No steps found." + : `No step was found with matching messageId ${messageId} out of ${steps.length} steps.`) + ); + + callbackPayload = atob(failingStep.body); + } + + const callbackMessage = JSON.parse(callbackPayload) as { status: number; body: string; retried?: number; // only set after the first try @@ -295,6 +321,7 @@ export const getHeaders = ( [WORKFLOW_INIT_HEADER]: initHeaderValue, [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: workflowUrl, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch", }; if (!step?.callUrl) { @@ -306,6 +333,7 @@ export const getHeaders = ( baseHeaders[`Upstash-Failure-Callback-Forward-${WORKFLOW_FAILURE_HEADER}`] = "true"; } baseHeaders["Upstash-Failure-Callback"] = failureUrl; + baseHeaders[`Upstash-Failure-Callback-${WORKFLOW_ID_HEADER}`] = workflowRunId; } // if retries is set or if call url is passed, set a retry @@ -355,6 +383,7 @@ export const getHeaders = ( "Upstash-Callback-Workflow-CallType": "fromCallback", "Upstash-Callback-Workflow-Init": "false", "Upstash-Callback-Workflow-Url": workflowUrl, + "Upstash-Callback-Feature-Set": "LazyFetch", "Upstash-Callback-Forward-Upstash-Workflow-Callback": "true", "Upstash-Callback-Forward-Upstash-Workflow-StepId": step.stepId.toString(), From 14dce13108a9c66fe02554f4e87f2e611c241de5 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 31 Oct 2024 19:09:27 +0300 Subject: [PATCH 6/8] fix: add lazyFetch for parallel --- src/client/utils.ts | 43 +++++++++++++++++++++++++++++++++++----- src/integration.test.ts | 7 ++++--- src/serve/index.ts | 1 + src/workflow-parser.ts | 26 +++++++++++++----------- src/workflow-requests.ts | 2 +- 5 files changed, 58 insertions(+), 21 deletions(-) diff --git a/src/client/utils.ts b/src/client/utils.ts index 1533661..516ecbe 100644 --- a/src/client/utils.ts +++ b/src/client/utils.ts @@ -1,6 +1,7 @@ import { Client } from "@upstash/qstash"; import { NotifyResponse, RawStep, Waiter } from "../types"; import { WorkflowLogger } from "../logger"; +import { WorkflowError } from "../error"; export const makeNotifyRequest = async ( requester: Client["http"], @@ -39,11 +40,43 @@ export const makeCancelRequest = async (requester: Client["http"], workflowRunId export const getSteps = async ( requester: Client["http"], workflowRunId: string, + messageId?: string, debug?: WorkflowLogger ): Promise => { - await debug?.log("INFO", "ENDPOINT_START", "Pulling steps from QStash."); - return (await requester.request({ - path: ["v2", "workflows", "runs", workflowRunId], - parseResponseAsJson: true, - })) as RawStep[]; + try { + const steps = (await requester.request({ + path: ["v2", "workflows", "runs", workflowRunId], + parseResponseAsJson: true, + })) as RawStep[]; + + if (!messageId) { + await debug?.log("INFO", "ENDPOINT_START", { + message: + `Pulled ${steps.length} steps from QStash` + + `and returned them without filtering with messageId.`, + }); + return steps; + } else { + const index = steps.findIndex((item) => item.messageId === messageId); + + if (index === -1) { + // targetMessageId not found, return an empty array or handle it as needed + return []; + } + + const filteredSteps = steps.slice(0, index + 1); + await debug?.log("INFO", "ENDPOINT_START", { + message: + `Pulled ${steps.length} steps from QStash` + + `and filtered them to ${filteredSteps.length} using messageId.`, + }); + return filteredSteps; + } + } catch (error) { + await debug?.log("ERROR", "ERROR", { + message: "failed while fetching steps.", + error: error, + }); + throw new WorkflowError(`Failed while pulling steps. ${error}`); + } }; diff --git a/src/integration.test.ts b/src/integration.test.ts index 3a1d395..010f5c1 100644 --- a/src/integration.test.ts +++ b/src/integration.test.ts @@ -783,7 +783,7 @@ describe.skip("live serve tests", () => { } ); - describe("lazy fetch", () => { + describe.skip("lazy fetch", () => { // create 5 mb payload. // lazy fetch will become enabled for payloads larger than 3mb const largeObject = "x".repeat(4 * 1024 * 1024); @@ -815,12 +815,12 @@ describe.skip("live serve tests", () => { timeout: 10_000, } ); - test.skip( + test( "large parallel step response", async () => { const finishState = new FinishState(); await testEndpoint({ - finalCount: 4, + finalCount: 11, waitFor: 7000, initialPayload: "my-payload", finishState, @@ -885,6 +885,7 @@ describe.skip("live serve tests", () => { timeout: 10_000, } ); + test( "large call response", async () => { diff --git a/src/serve/index.ts b/src/serve/index.ts index 3439fda..670b407 100644 --- a/src/serve/index.ts +++ b/src/serve/index.ts @@ -82,6 +82,7 @@ export const serve = < isFirstInvocation, workflowRunId, qstashClient.http, + request.headers.get("upstash-message-id")!, debug ); diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts index 0bbcd8c..af77e54 100644 --- a/src/workflow-parser.ts +++ b/src/workflow-parser.ts @@ -225,6 +225,7 @@ export const parseRequest = async ( isFirstInvocation: boolean, workflowRunId: string, requester: Client["http"], + messageId?: string, debug?: WorkflowLogger ): Promise<{ rawInitialPayload: string; @@ -247,7 +248,7 @@ export const parseRequest = async ( ); } const { rawInitialPayload, steps } = await parsePayload( - requestPayload ? requestPayload : await getSteps(requester, workflowRunId), + requestPayload ? requestPayload : await getSteps(requester, workflowRunId, messageId, debug), debug ); const isLastDuplicate = await checkIfLastOneIsDuplicate(steps, debug); @@ -296,17 +297,17 @@ export const handleFailure = async ( } try { - const { status, header, body, url, sourceHeader, sourceBody, workflowRunId } = JSON.parse( - requestPayload - ) as { - status: number; - header: Record; - body: string; - url: string; - sourceHeader: Record; - sourceBody: string; - workflowRunId: string; - }; + const { status, header, body, url, sourceHeader, sourceBody, workflowRunId, sourceMessageId } = + JSON.parse(requestPayload) as { + status: number; + header: Record; + body: string; + url: string; + sourceHeader: Record; + sourceBody: string; + workflowRunId: string; + sourceMessageId: string; + }; const decodedBody = body ? decodeBase64(body) : "{}"; const errorPayload = JSON.parse(decodedBody) as FailureFunctionPayload; @@ -322,6 +323,7 @@ export const handleFailure = async ( false, workflowRunId, qstashClient.http, + sourceMessageId, debug ); diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index e1c9b24..ac3d061 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -170,7 +170,7 @@ export const handleThirdPartyCallResult = async ( throw new WorkflowError("workflow run id missing in context.call lazy fetch."); if (!messageId) throw new WorkflowError("message id missing in context.call lazy fetch."); - const steps = await getSteps(client.http, workflowRunId, debug); + const steps = await getSteps(client.http, workflowRunId, messageId, debug); const failingStep = steps.find((step) => step.messageId === messageId); if (!failingStep) From 6db31cea9d3741e578ac0dfaedd46b45df083131 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Fri, 1 Nov 2024 09:45:23 +0300 Subject: [PATCH 7/8] fix: docstings --- src/client/index.ts | 2 ++ src/client/utils.ts | 13 ++++++++++--- src/context/context.ts | 9 ++++++++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index 8304503..ee07981 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -26,6 +26,8 @@ export class Client { /** * Cancel an ongoing workflow * + * Returns true if workflow is canceled succesfully. Otherwise, throws error. + * * ```ts * import { Client } from "@upstash/workflow"; * diff --git a/src/client/utils.ts b/src/client/utils.ts index 516ecbe..7c7f589 100644 --- a/src/client/utils.ts +++ b/src/client/utils.ts @@ -28,13 +28,20 @@ export const makeGetWaitersRequest = async ( return result; }; +/** + * Returns true if workflow is canceled succesfully. Otherwise, throws error. + * + * @param requester client.http + * @param workflowRunId workflow to cancel + * @returns true if workflow is canceled + */ export const makeCancelRequest = async (requester: Client["http"], workflowRunId: string) => { - const result = (await requester.request({ + (await requester.request({ path: ["v2", "workflows", "runs", `${workflowRunId}?cancel=true`], method: "DELETE", parseResponseAsJson: false, - })) as { error: string } | undefined; - return result ?? true; + })) as undefined; + return true; }; export const getSteps = async ( diff --git a/src/context/context.ts b/src/context/context.ts index 930b125..322fff8 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -388,7 +388,7 @@ export class WorkflowContext { } /** - * Notify waiting workflow runs + * Notify workflow runs waiting for an event * * ```ts * const { eventId, eventData, notifyResponse } = await context.notify( @@ -396,6 +396,13 @@ export class WorkflowContext { * ); * ``` * + * Upon `context.notify`, the workflow runs waiting for the given eventId (context.waitForEvent) + * will receive the given event data and resume execution. + * + * The response includes the same eventId and eventData. Additionally, there is + * a notifyResponse field which contains a list of `Waiter` objects, each corresponding + * to a notified workflow run. + * * @param stepName * @param eventId event id to notify * @param eventData event data to notify with From 87b6eb83fdd5dfd5b851ee4d2c40e36089ac9dfe Mon Sep 17 00:00:00 2001 From: CahidArda Date: Fri, 1 Nov 2024 17:39:07 +0300 Subject: [PATCH 8/8] fix: tests --- src/context/auto-executor.test.ts | 2 ++ src/context/steps.test.ts | 2 +- src/serve/serve.test.ts | 54 ++++++++++++------------------- src/types.ts | 3 +- 4 files changed, 24 insertions(+), 37 deletions(-) diff --git a/src/context/auto-executor.test.ts b/src/context/auto-executor.test.ts index 869e7dc..eb15c5d 100644 --- a/src/context/auto-executor.test.ts +++ b/src/context/auto-executor.test.ts @@ -229,6 +229,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-delay": "10m", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", @@ -258,6 +259,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", diff --git a/src/context/steps.test.ts b/src/context/steps.test.ts index e818778..4c7a288 100644 --- a/src/context/steps.test.ts +++ b/src/context/steps.test.ts @@ -93,7 +93,7 @@ describe("test steps", () => { concurrent, targetStep, }); - }) + }); test("should create result step", async () => { expect(await stepWithDuration.getResultStep(6, stepId)).toEqual({ diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index 2688aa2..e27f6ae 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -636,12 +636,15 @@ describe("serve", () => { test("should send waitForEvent", async () => { const request = getRequest(WORKFLOW_ENDPOINT, "wfr-bar", "my-payload", []); - const { handler: endpoint } = serve(async (context) => { - await context.waitForEvent("waiting step", "wait-event-id", "10d") - }, { - qstashClient, - receiver: undefined, - }); + const { handler: endpoint } = serve( + async (context) => { + await context.waitForEvent("waiting step", "wait-event-id", "10d"); + }, + { + qstashClient, + receiver: undefined, + } + ); let called = false; await mockQStashServer({ execute: async () => { @@ -663,38 +666,21 @@ describe("serve", () => { }, timeout: "10d", timeoutHeaders: { - "Content-Type": [ - "application/json" - ], - "Upstash-Forward-Upstash-Workflow-Sdk-Version": [ - "1" - ], - "Upstash-Retries": [ - "3" - ], - "Upstash-Workflow-CallType": [ - "step" - ], - "Upstash-Workflow-Init": [ - "false" - ], - "Upstash-Workflow-RunId": [ - "wfr-bar" - ], - "Upstash-Workflow-Runid": [ - "wfr-bar" - ], - "Upstash-Workflow-Url": [ - WORKFLOW_ENDPOINT - ], + "Content-Type": ["application/json"], + "Upstash-Feature-Set": ["LazyFetch"], + "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], + "Upstash-Retries": ["3"], + "Upstash-Workflow-CallType": ["step"], + "Upstash-Workflow-Init": ["false"], + "Upstash-Workflow-RunId": ["wfr-bar"], + "Upstash-Workflow-Runid": ["wfr-bar"], + "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT], }, timeoutUrl: WORKFLOW_ENDPOINT, url: WORKFLOW_ENDPOINT, - } + }, }, }); expect(called).toBeTrue(); - - }) - + }); }); diff --git a/src/types.ts b/src/types.ts index 5f1e739..ef31826 100644 --- a/src/types.ts +++ b/src/types.ts @@ -292,5 +292,4 @@ export type CallResponse = { header: Record; }; - -export type Duration = `${bigint}s` | `${bigint}m` | `${bigint}h` | `${bigint}d` \ No newline at end of file +export type Duration = `${bigint}s` | `${bigint}m` | `${bigint}h` | `${bigint}d`;