Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/replication-error-recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Runs and sessions replication services now auto-recover from stream errors (e.g. after a Postgres failover) instead of silently leaving replication stopped. Behaviour is configurable per service — reconnect (default), exit so a process supervisor can restart the host, or log.
23 changes: 23 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,16 @@ const EnvironmentSchema = z
RUN_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"),
RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"),
RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"),
// What to do when the runs replication client errors (e.g. after a
// Postgres failover). `reconnect` (default) re-subscribes in-process with
// exponential backoff; `exit` exits the process so a supervisor restarts
// it; `log` preserves the old no-op behaviour. Reconnect tuning is
// shared across both replication services via REPLICATION_RECONNECT_*.
RUN_REPLICATION_ERROR_STRATEGY: z
.enum(["reconnect", "exit", "log"])
.default("reconnect"),
RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000),
RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1),

// Session replication (Postgres → ClickHouse sessions_v1). Shares Redis
// with the runs replicator for leader locking but has its own slot and
Expand Down Expand Up @@ -1338,6 +1348,19 @@ const EnvironmentSchema = z
SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),
// Error recovery — same semantics as RUN_REPLICATION_ERROR_STRATEGY.
SESSION_REPLICATION_ERROR_STRATEGY: z
.enum(["reconnect", "exit", "log"])
.default("reconnect"),
SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000),
SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1),

// Reconnect tuning shared across both replication services. Only
// applies when error strategy is `reconnect`. Max attempts of 0 means
// unlimited (default).
REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().min(0).default(1_000),
REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().min(0).default(60_000),
REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().min(0).default(0),

// Clickhouse
CLICKHOUSE_URL: z.string(),
Expand Down
194 changes: 194 additions & 0 deletions apps/webapp/app/services/replicationErrorRecovery.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import { Logger } from "@trigger.dev/core/logger";

// When the LogicalReplicationClient's WAL stream errors (e.g. after a
// Postgres failover) it calls stop() on itself and stays stopped. The host
// service has to decide how to recover. Three strategies are available:
//
// - "reconnect" — re-subscribe in-process with exponential backoff. Default;
// works without a process supervisor.
// - "exit" — exit the process so an external supervisor (Docker
// restart=always, ECS, systemd, k8s, ...) replaces it. Recommended when a
// supervisor is present because it gets a clean slate every time.
// - "log" — preserve the historical no-op behaviour. Useful for
// debugging or in test environments where you want to observe the
// silent-death failure mode.
export type ReplicationErrorRecoveryStrategy =
| {
type: "reconnect";
initialDelayMs?: number;
maxDelayMs?: number;
// 0 (or undefined) means retry forever.
maxAttempts?: number;
}
| {
type: "exit";
exitDelayMs?: number;
exitCode?: number;
}
| { type: "log" };

export type ReplicationErrorRecoveryDeps = {
strategy: ReplicationErrorRecoveryStrategy;
logger: Logger;
// Re-subscribe the underlying replication client. Implementations should
// call client.subscribe(...) and resolve once the stream is started.
reconnect: () => Promise<void>;
// True once the host service has begun graceful shutdown — recovery
// suppresses all work in that state.
isShuttingDown: () => boolean;
};

export type ReplicationErrorRecovery = {
// Called from the replication client's "error" event handler.
handle(error: unknown): void;
// Called from the replication client's "start" event handler. Resets the
// reconnect attempt counter so the next failure starts from initialDelayMs.
notifyStreamStarted(): void;
// Cancel any pending reconnect/exit timer. Called from shutdown().
dispose(): void;
};

export function createReplicationErrorRecovery(
deps: ReplicationErrorRecoveryDeps
): ReplicationErrorRecovery {
const { strategy, logger, reconnect, isShuttingDown } = deps;
let attempt = 0;
let pendingReconnect: NodeJS.Timeout | null = null;
let pendingExit: NodeJS.Timeout | null = null;
let exiting = false;

function scheduleReconnect(error: unknown): void {
if (strategy.type !== "reconnect") return;
if (pendingReconnect) return;

attempt += 1;
const maxAttempts = strategy.maxAttempts ?? 0;
if (maxAttempts > 0 && attempt > maxAttempts) {
logger.error("Replication reconnect exceeded maxAttempts; giving up", {
attempt,
maxAttempts,
error,
});
return;
}

const initialDelay = strategy.initialDelayMs ?? 1_000;
const maxDelay = strategy.maxDelayMs ?? 60_000;
const delay = Math.min(initialDelay * Math.pow(2, attempt - 1), maxDelay);

logger.error("Replication stream lost — scheduling reconnect", {
attempt,
delayMs: delay,
error,
});

pendingReconnect = setTimeout(async () => {
pendingReconnect = null;
if (isShuttingDown()) return;

try {
await reconnect();
// Success path is handled by notifyStreamStarted, which fires from
// the replication client's "start" event after the stream is live.
} catch (err) {
// subscribe() can throw without first emitting an "error" event —
// notably when the initial pg client.connect() fails because Postgres
// is still unreachable mid-failover. Schedule the next attempt
// ourselves so recovery doesn't silently stop. If subscribe() did
// also emit an "error" event, handle() will call scheduleReconnect()
// first; the guard on pendingReconnect makes this idempotent.
logger.error("Replication reconnect attempt failed", {
attempt,
error: err,
});
scheduleReconnect(err);
}
Comment thread
ericallam marked this conversation as resolved.
}, delay);
Comment on lines +85 to +106
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Reconnect silently stalls if subscribe() fails to acquire the leader lock

When the reconnect callback calls this._replicationClient.subscribe(...) (replicationErrorRecovery.server.ts:90), the subscribe() method in client.ts:240 may fail to acquire the leader lock (line 254-258). In that case, it emits leaderElection(false), calls this.stop(), and returns — without throwing and without emitting an error event. From the recovery module's perspective, the reconnect() promise resolved successfully, so it does not schedule another attempt. But notifyStreamStarted() will never fire either (no start event is emitted), so the attempt counter is never reset. The stream is now permanently dead with no further recovery.

In practice this would only occur in multi-replica deployments where another instance wins the leader lock during the reconnect window. Since the other instance is handling replication, this may be acceptable — but the current instance has no path to ever resume if the other instance later dies. This is a design limitation rather than a clear bug, but worth understanding.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}

function scheduleExit(): void {
if (strategy.type !== "exit") return;
if (exiting) return;
exiting = true;

const delay = strategy.exitDelayMs ?? 5_000;
const code = strategy.exitCode ?? 1;

logger.error("Fatal replication error — exiting to let process supervisor restart", {
exitCode: code,
exitDelayMs: delay,
});

pendingExit = setTimeout(() => {
// eslint-disable-next-line no-process-exit
process.exit(code);
}, delay);
// Don't hold a clean shutdown back on this timer.
pendingExit.unref();
}

return {
handle(error) {
if (isShuttingDown()) return;
switch (strategy.type) {
case "log":
return;
case "exit":
return scheduleExit();
case "reconnect":
return scheduleReconnect(error);
}
},
notifyStreamStarted() {
if (attempt > 0) {
logger.info("Replication reconnect succeeded", { attempt });
attempt = 0;
}
},
dispose() {
if (pendingReconnect) {
clearTimeout(pendingReconnect);
pendingReconnect = null;
}
if (pendingExit) {
clearTimeout(pendingExit);
pendingExit = null;
}
},
};
}

// Shape of the env-driven configuration object the instance bootstrap files
// build from process.env. Kept separate from the strategy union above so the
// instance code can pass a single object regardless of which strategy is set.
export type ReplicationErrorRecoveryEnv = {
strategy: "reconnect" | "exit" | "log";
reconnectInitialDelayMs?: number;
reconnectMaxDelayMs?: number;
reconnectMaxAttempts?: number;
exitDelayMs?: number;
exitCode?: number;
};

export function strategyFromEnv(
env: ReplicationErrorRecoveryEnv
): ReplicationErrorRecoveryStrategy {
switch (env.strategy) {
case "exit":
return {
type: "exit",
exitDelayMs: env.exitDelayMs,
exitCode: env.exitCode,
};
case "log":
return { type: "log" };
case "reconnect":
default:
return {
type: "reconnect",
initialDelayMs: env.reconnectInitialDelayMs,
maxDelayMs: env.reconnectMaxDelayMs,
maxAttempts: env.reconnectMaxAttempts,
};
}
}
9 changes: 9 additions & 0 deletions apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import invariant from "tiny-invariant";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { meter, provider } from "~/v3/tracer.server";
import { strategyFromEnv } from "./replicationErrorRecovery.server";
import { RunsReplicationService } from "./runsReplicationService.server";
import { signalsEmitter } from "./signals.server";

Expand Down Expand Up @@ -69,6 +70,14 @@ function initializeRunsReplicationInstance() {
insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY,
disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1",
disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1",
errorRecovery: strategyFromEnv({
strategy: env.RUN_REPLICATION_ERROR_STRATEGY,
reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS,
reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS,
reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS,
exitDelayMs: env.RUN_REPLICATION_EXIT_DELAY_MS,
exitCode: env.RUN_REPLICATION_EXIT_CODE,
}),
});

if (env.RUN_REPLICATION_ENABLED === "1") {
Expand Down
21 changes: 21 additions & 0 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import EventEmitter from "node:events";
import pLimit from "p-limit";
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
import {
createReplicationErrorRecovery,
type ReplicationErrorRecovery,
type ReplicationErrorRecoveryStrategy,
} from "./replicationErrorRecovery.server";

interface TransactionEvent<T = any> {
tag: "insert" | "update" | "delete";
Expand Down Expand Up @@ -73,6 +78,9 @@ export type RunsReplicationServiceOptions = {
insertMaxDelayMs?: number;
disablePayloadInsert?: boolean;
disableErrorFingerprinting?: boolean;
// What to do when the replication client errors (e.g. after a Postgres
// failover). Defaults to in-process reconnect with exponential backoff.
errorRecovery?: ReplicationErrorRecoveryStrategy;
};

type PostgresTaskRun = TaskRun & { masterQueue: string };
Expand Down Expand Up @@ -119,6 +127,7 @@ export class RunsReplicationService {
private _insertStrategy: "insert" | "insert_async";
private _disablePayloadInsert: boolean;
private _disableErrorFingerprinting: boolean;
private _errorRecovery: ReplicationErrorRecovery;

// Metrics
private _replicationLagHistogram: Histogram;
Expand Down Expand Up @@ -250,14 +259,25 @@ export class RunsReplicationService {
}
});

this._errorRecovery = createReplicationErrorRecovery({
strategy: options.errorRecovery ?? { type: "reconnect" },
logger: this.logger,
reconnect: async () => {
await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined);
},
Comment thread
ericallam marked this conversation as resolved.
isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete,
});

this._replicationClient.events.on("error", (error) => {
this.logger.error("Replication client error", {
error,
});
this._errorRecovery.handle(error);
});

this._replicationClient.events.on("start", () => {
this.logger.info("Replication client started");
this._errorRecovery.notifyStreamStarted();
});

this._replicationClient.events.on("acknowledge", ({ lsn }) => {
Expand All @@ -278,6 +298,7 @@ export class RunsReplicationService {
if (this._isShuttingDown) return;

this._isShuttingDown = true;
this._errorRecovery.dispose();

this.logger.info("Initiating shutdown of runs replication service");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import invariant from "tiny-invariant";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { meter, provider } from "~/v3/tracer.server";
import { strategyFromEnv } from "./replicationErrorRecovery.server";
import { SessionsReplicationService } from "./sessionsReplicationService.server";

export const sessionsReplicationInstance = singleton(
Expand Down Expand Up @@ -66,6 +67,14 @@ function initializeSessionsReplicationInstance() {
insertBaseDelayMs: env.SESSION_REPLICATION_INSERT_BASE_DELAY_MS,
insertMaxDelayMs: env.SESSION_REPLICATION_INSERT_MAX_DELAY_MS,
insertStrategy: env.SESSION_REPLICATION_INSERT_STRATEGY,
errorRecovery: strategyFromEnv({
strategy: env.SESSION_REPLICATION_ERROR_STRATEGY,
reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS,
reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS,
reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS,
exitDelayMs: env.SESSION_REPLICATION_EXIT_DELAY_MS,
exitCode: env.SESSION_REPLICATION_EXIT_CODE,
}),
});

return service;
Expand Down
Loading
Loading