Skip to content

Commit dde4fe5

Browse files
committed
🤖 ci: add compaction crash/restart integration tests
1 parent b544b4f commit dde4fe5

File tree

4 files changed

+631
-0
lines changed

4 files changed

+631
-0
lines changed
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
import * as path from "path";
2+
import * as fs from "fs/promises";
3+
import * as fsSync from "fs";
4+
import { spawn } from "child_process";
5+
import type { WorkspaceChatMessage } from "@/common/orpc/types";
6+
import {
7+
shouldRunIntegrationTests,
8+
createTestEnvironment,
9+
createTestEnvironmentFromRootDir,
10+
cleanupTestEnvironment,
11+
} from "./setup";
12+
import {
13+
createTempGitRepo,
14+
cleanupTempGitRepo,
15+
createWorkspace,
16+
generateBranchName,
17+
} from "./helpers";
18+
import { HistoryService } from "../../src/node/services/historyService";
19+
import { createMuxMessage } from "../../src/common/types/message";
20+
21+
const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip;
22+
23+
function isChatMessage(event: WorkspaceChatMessage): event is WorkspaceChatMessage & {
24+
type: "message";
25+
role: "user" | "assistant";
26+
metadata?: { compacted?: unknown };
27+
} {
28+
return (
29+
typeof event === "object" &&
30+
event !== null &&
31+
"type" in event &&
32+
(event as { type?: unknown }).type === "message" &&
33+
"role" in event
34+
);
35+
}
36+
37+
async function waitForChatJsonlTmpFile(sessionDir: string, timeoutMs: number): Promise<void> {
38+
const deadline = Date.now() + timeoutMs;
39+
40+
const hasTmp = async (): Promise<boolean> => {
41+
const names = await fs.readdir(sessionDir).catch(() => [] as string[]);
42+
return names.some((name) => name.startsWith("chat.jsonl."));
43+
};
44+
45+
if (await hasTmp()) {
46+
return;
47+
}
48+
49+
await new Promise<void>((resolve, reject) => {
50+
let settled = false;
51+
52+
const finish = (error?: Error) => {
53+
if (settled) return;
54+
settled = true;
55+
clearInterval(poller);
56+
clearTimeout(timeout);
57+
try {
58+
watcher.close();
59+
} catch {
60+
// ignore
61+
}
62+
if (error) {
63+
reject(error);
64+
} else {
65+
resolve();
66+
}
67+
};
68+
69+
const timeout = setTimeout(
70+
() => {
71+
finish(new Error(`Timed out waiting for write-file-atomic tmpfile in ${sessionDir}`));
72+
},
73+
Math.max(0, deadline - Date.now())
74+
);
75+
76+
// Polling + fs.watch: watch is fast when it works; polling is the fallback.
77+
const poller = setInterval(() => {
78+
void hasTmp().then((ok) => {
79+
if (ok) finish();
80+
});
81+
}, 5);
82+
83+
const watcher = fsSync.watch(sessionDir, { persistent: false }, (_event, filename) => {
84+
if (filename && filename.startsWith("chat.jsonl.")) {
85+
finish();
86+
return;
87+
}
88+
void hasTmp().then((ok) => {
89+
if (ok) finish();
90+
});
91+
});
92+
});
93+
}
94+
95+
describeIntegration("compaction durability", () => {
96+
test.concurrent(
97+
"should not lose history if process is SIGKILLed during atomic replaceChatHistory",
98+
async () => {
99+
if (process.platform === "win32") {
100+
// SIGKILL isn't supported on Windows.
101+
return;
102+
}
103+
104+
const tempGitRepo = await createTempGitRepo();
105+
const env1 = await createTestEnvironment();
106+
107+
try {
108+
const branchName = generateBranchName("atomic-replace");
109+
const createResult = await createWorkspace(env1, tempGitRepo, branchName);
110+
expect(createResult.success).toBe(true);
111+
if (!createResult.success) {
112+
throw new Error(createResult.error);
113+
}
114+
115+
const workspaceId = createResult.metadata.id;
116+
if (!workspaceId) {
117+
throw new Error("Workspace ID not returned from creation");
118+
}
119+
120+
// Seed history (no LLM): create a small, valid chat.jsonl we can later replay via ORPC.
121+
const historyService = new HistoryService(env1.config);
122+
const seededMessages = [
123+
createMuxMessage("seed-1", "user", "hello", {}),
124+
createMuxMessage("seed-2", "assistant", "hi", {}),
125+
createMuxMessage("seed-3", "user", "how are you", {}),
126+
createMuxMessage("seed-4", "assistant", "fine", {}),
127+
];
128+
129+
for (const msg of seededMessages) {
130+
const result = await historyService.appendToHistory(workspaceId, msg);
131+
expect(result.success).toBe(true);
132+
if (!result.success) {
133+
throw new Error(result.error);
134+
}
135+
}
136+
137+
// Stop the in-process backend, but keep rootDir on disk.
138+
await env1.services.dispose();
139+
await env1.services.shutdown();
140+
141+
const rootDir = env1.tempDir;
142+
const sessionDir = path.join(rootDir, "sessions", workspaceId);
143+
const workerScript = path.join(__dirname, "workers", "replaceChatHistoryWorker.ts");
144+
145+
const worker = spawn("bun", [workerScript], {
146+
cwd: process.cwd(),
147+
env: {
148+
...process.env,
149+
MUX_TEST_ROOT_DIR: rootDir,
150+
MUX_TEST_WORKSPACE_ID: workspaceId,
151+
// Keep the atomic write busy long enough to observe the tmp file.
152+
MUX_TEST_SUMMARY_BYTES: "50000000",
153+
},
154+
stdio: ["ignore", "ignore", "pipe"],
155+
});
156+
157+
let workerStderr = "";
158+
worker.stderr?.on("data", (chunk) => {
159+
workerStderr += chunk.toString();
160+
});
161+
162+
const exited = new Promise<{ code: number | null; signal: NodeJS.Signals | null }>(
163+
(resolve) => {
164+
worker.once("exit", (code, signal) => resolve({ code, signal }));
165+
}
166+
);
167+
168+
const race = await Promise.race([
169+
waitForChatJsonlTmpFile(sessionDir, 20000).then(() => "tmp" as const),
170+
exited.then(() => "exit" as const),
171+
]);
172+
173+
if (race === "tmp") {
174+
// Simulate an abrupt crash.
175+
worker.kill("SIGKILL");
176+
}
177+
178+
const exitInfo = await exited;
179+
if (exitInfo.code !== 0 && exitInfo.signal !== "SIGKILL") {
180+
throw new Error(
181+
`Worker failed: code=${exitInfo.code} signal=${exitInfo.signal}\n${workerStderr}`
182+
);
183+
}
184+
185+
// Restart backend and assert behaviorally via ORPC history replay.
186+
const env2 = await createTestEnvironmentFromRootDir(rootDir);
187+
try {
188+
const replay = await env2.orpc.workspace.getFullReplay({ workspaceId });
189+
const replayedMessages = replay.filter(isChatMessage);
190+
191+
expect([seededMessages.length, 1]).toContain(replayedMessages.length);
192+
193+
if (replayedMessages.length === 1) {
194+
expect(replayedMessages[0].metadata?.compacted).toBeTruthy();
195+
}
196+
} finally {
197+
// Best-effort: remove workspace before tearing down rootDir.
198+
try {
199+
await env2.orpc.workspace.remove({ workspaceId, options: { force: true } });
200+
} catch {
201+
// ignore
202+
}
203+
await cleanupTestEnvironment(env2);
204+
}
205+
} finally {
206+
await cleanupTempGitRepo(tempGitRepo);
207+
}
208+
},
209+
60000
210+
);
211+
});

0 commit comments

Comments
 (0)