Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/strange-moles-provide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Auto-cancel in-flight dev runs when the CLI exits, using a detached watchdog process that survives pnpm SIGKILL
6 changes: 6 additions & 0 deletions .server-changes/dev-cli-disconnect-md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Added `/engine/v1/dev/disconnect` endpoint to auto-cancel runs when the CLI disconnects. Maximum of 500 runs can be cancelled. Uses the bulk action system when there are more than 25 runs to cancel.
180 changes: 180 additions & 0 deletions apps/webapp/app/routes/engine.v1.dev.disconnect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import { json } from "@remix-run/server-runtime";
import { Ratelimit } from "@upstash/ratelimit";
import { tryCatch } from "@trigger.dev/core";
import { DevDisconnectRequestBody } from "@trigger.dev/core/v3";
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
import { BulkActionNotificationType, BulkActionType } from "@trigger.dev/database";
import { prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { RateLimiter } from "~/services/rateLimiter.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
import { commonWorker } from "~/v3/commonWorker.server";
import pMap from "p-map";

const CANCEL_REASON = "Dev session ended (CLI exited)";

// Below this threshold, cancel runs inline with pMap.
// Above it, create a bulk action and process asynchronously.
const BULK_ACTION_THRESHOLD = 25;

// Maximum number of runs that can be cancelled in a single disconnect call.
const MAX_RUNS = 500;

// Rate limit: 5 calls per minute per environment
const disconnectRateLimiter = new RateLimiter({
keyPrefix: "dev-disconnect",
limiter: Ratelimit.fixedWindow(5, "1 m"),
logFailure: true,
});

const { action } = createActionApiRoute(
{
body: DevDisconnectRequestBody,
maxContentLength: 1024 * 256, // 256KB
method: "POST",
},
async ({ authentication, body }) => {
// Only allow dev environments — this endpoint uses finalizeRun which
// skips PENDING_CANCEL and immediately finalizes executing runs.
if (authentication.environment.type !== "DEVELOPMENT") {
return json({ error: "This endpoint is only available for dev environments" }, { status: 403 });
}

const environmentId = authentication.environment.id;

// Rate limit per environment
const rateLimitResult = await disconnectRateLimiter.limit(environmentId);
if (!rateLimitResult.success) {
return json(
{ error: "Rate limit exceeded", retryAfter: Math.ceil((rateLimitResult.reset - Date.now()) / 1000) },
{ status: 429 }
);
}

if (body.runFriendlyIds.length > MAX_RUNS) {
return json(
{ error: `A maximum of ${MAX_RUNS} runs can be cancelled per request` },
{ status: 400 }
);
}

const { runFriendlyIds } = body;

if (runFriendlyIds.length === 0) {
return json({ cancelled: 0 }, { status: 200 });
}

logger.info("Dev disconnect: cancelling runs", {
environmentId,
runCount: runFriendlyIds.length,
});

// For small numbers of runs, cancel inline
if (runFriendlyIds.length <= BULK_ACTION_THRESHOLD) {
const cancelled = await cancelRunsInline(runFriendlyIds, environmentId);
return json({ cancelled }, { status: 200 });
}

// For large numbers, create a bulk action to process asynchronously
const bulkActionId = await createBulkCancelAction(
runFriendlyIds,
authentication.environment.project.id,
environmentId
);

logger.info("Dev disconnect: created bulk action for large run set", {
environmentId,
bulkActionId,
runCount: runFriendlyIds.length,
});

return json({ cancelled: 0, bulkActionId }, { status: 200 });
}
);

async function cancelRunsInline(
runFriendlyIds: string[],
environmentId: string
): Promise<number> {
const runIds = runFriendlyIds.map((fid) => RunId.toId(fid));

const runs = await prisma.taskRun.findMany({
where: {
id: { in: runIds },
runtimeEnvironmentId: environmentId,
},
select: {
id: true,
engine: true,
friendlyId: true,
status: true,
createdAt: true,
completedAt: true,
taskEventStore: true,
},
});

let cancelled = 0;
const cancelService = new CancelTaskRunService(prisma);

await pMap(
runs,
async (run) => {
const [error, result] = await tryCatch(
cancelService.call(run, { reason: CANCEL_REASON, finalizeRun: true })
);

if (error) {
logger.error("Dev disconnect: failed to cancel run", {
runId: run.id,
error,
});
} else if (result && !result.alreadyFinished) {
cancelled++;
}
},
{ concurrency: 10 }
);

logger.info("Dev disconnect: completed inline cancellation", {
environmentId,
cancelled,
total: runFriendlyIds.length,
});

return cancelled;
}

async function createBulkCancelAction(
runFriendlyIds: string[],
projectId: string,
environmentId: string
): Promise<string> {
const { id, friendlyId } = BulkActionId.generate();

await prisma.bulkActionGroup.create({
data: {
id,
friendlyId,
projectId,
environmentId,
name: "Dev session disconnect",
type: BulkActionType.CANCEL,
params: { runId: runFriendlyIds, finalizeRun: true },
queryName: "bulk_action_v1",
totalCount: runFriendlyIds.length,
completionNotification: BulkActionNotificationType.NONE,
},
});

await commonWorker.enqueue({
id: `processBulkAction-${id}`,
job: "processBulkAction",
payload: { bulkActionId: id },
});

return friendlyId;
}

export { action };
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ export class BulkActionService extends BaseService {
}

// 2. Parse the params
const rawParams = group.params && typeof group.params === "object" ? group.params : {};
const finalizeRun = "finalizeRun" in rawParams && (rawParams as any).finalizeRun === true;
const filters = parseRunListInputOptions({
organizationId: group.project.organizationId,
projectId: group.projectId,
environmentId: group.environmentId,
...(group.params && typeof group.params === "object" ? group.params : {}),
...rawParams,
});

const runsRepository = new RunsRepository({
Expand Down Expand Up @@ -199,6 +201,7 @@ export class BulkActionService extends BaseService {
cancelService.call(run, {
reason: `Bulk action ${group.friendlyId} cancelled run`,
bulkActionId: bulkActionId,
finalizeRun,
})
);
if (error) {
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export type CancelTaskRunServiceOptions = {
cancelAttempts?: boolean;
cancelledAt?: Date;
bulkActionId?: string;
/** Skip PENDING_CANCEL and finalize immediately (use when the worker is known to be dead). */
finalizeRun?: boolean;
};

type CancelTaskRunServiceResult = {
Expand Down Expand Up @@ -57,6 +59,7 @@ export class CancelTaskRunService extends BaseService {
runId: taskRun.id,
completedAt: options?.cancelledAt,
reason: options?.reason,
finalizeRun: options?.finalizeRun,
bulkActionId: options?.bulkActionId,
tx: this._prisma,
});
Expand Down
52 changes: 28 additions & 24 deletions internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1436,35 +1436,39 @@ export class RunAttemptSystem {
});

//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
//unless finalizeRun is true (worker is known to be dead), in which case skip straight to FINISHED
if (
isExecuting(latestSnapshot.executionStatus) ||
isPendingExecuting(latestSnapshot.executionStatus)
) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
run,
snapshot: {
executionStatus: "PENDING_CANCEL",
description: "Run was cancelled",
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
workerId,
runnerId,
});
if (!finalizeRun) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
run,
snapshot: {
executionStatus: "PENDING_CANCEL",
description: "Run was cancelled",
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
workerId,
runnerId,
});

//the worker needs to be notified so it can kill the run and complete the attempt
await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
return {
alreadyFinished: false,
...executionResultFromSnapshot(newSnapshot),
};
//the worker needs to be notified so it can kill the run and complete the attempt
await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
return {
alreadyFinished: false,
...executionResultFromSnapshot(newSnapshot),
};
}
// finalizeRun is true — fall through to finish the run immediately
}

//not executing, so we will actually finish the run
Expand Down
20 changes: 20 additions & 0 deletions packages/cli-v3/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
DevConfigResponseBody,
DevDequeueRequestBody,
DevDequeueResponseBody,
DevDisconnectRequestBody,
DevDisconnectResponseBody,
EnvironmentVariableResponseBody,
FailDeploymentRequestBody,
FailDeploymentResponseBody,
Expand Down Expand Up @@ -557,6 +559,7 @@ export class CliApiClient {
heartbeatRun: this.devHeartbeatRun.bind(this),
startRunAttempt: this.devStartRunAttempt.bind(this),
completeRunAttempt: this.devCompleteRunAttempt.bind(this),
disconnect: this.devDisconnect.bind(this),
setEngineURL: this.setEngineURL.bind(this),
} as const;
}
Expand Down Expand Up @@ -681,6 +684,23 @@ export class CliApiClient {
return eventSource;
}

private async devDisconnect(
body: DevDisconnectRequestBody
): Promise<ApiResult<DevDisconnectResponseBody>> {
if (!this.accessToken) {
throw new Error("devDisconnect: No access token");
}

return wrapZodFetch(DevDisconnectResponseBody, `${this.engineURL}/engine/v1/dev/disconnect`, {
method: "POST",
headers: {
Authorization: `Bearer ${this.accessToken}`,
Accept: "application/json",
},
body: JSON.stringify(body),
});
}

private async devDequeue(
body: DevDequeueRequestBody
): Promise<ApiResult<DevDequeueResponseBody>> {
Expand Down
Loading