From 6ae5a5da1f1caf1743c64449f6d59eeb71386308 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 13:49:38 -0600 Subject: [PATCH 1/9] =?UTF-8?q?=F0=9F=A4=96=20fix:=20make=20compaction=20c?= =?UTF-8?q?ontrol-plane=20and=20atomic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/browser/components/AIView.tsx | 16 +- src/browser/hooks/useIdleCompactionHandler.ts | 52 +- src/browser/utils/chatCommands.ts | 40 +- src/common/orpc/schemas/api.ts | 39 ++ src/node/orpc/router.ts | 17 + src/node/services/agentSession.ts | 198 +++++++ src/node/services/compactionHandler.test.ts | 510 ++++++++---------- src/node/services/compactionHandler.ts | 171 ++++-- src/node/services/historyService.ts | 54 ++ src/node/services/workspaceService.ts | 104 ++++ 10 files changed, 835 insertions(+), 366 deletions(-) diff --git a/src/browser/components/AIView.tsx b/src/browser/components/AIView.tsx index b950e02fa1..fc691d537e 100644 --- a/src/browser/components/AIView.tsx +++ b/src/browser/components/AIView.tsx @@ -62,7 +62,7 @@ import { ConcurrentLocalWarning } from "./ConcurrentLocalWarning"; import { BackgroundProcessesBanner } from "./BackgroundProcessesBanner"; import { useBackgroundBashHandlers } from "@/browser/hooks/useBackgroundBashHandlers"; import { checkAutoCompaction } from "@/browser/utils/compaction/autoCompactionCheck"; -import { executeCompaction } from "@/browser/utils/chatCommands"; + import { useProviderOptions } from "@/browser/hooks/useProviderOptions"; import { useAutoCompactionSettings } from "../hooks/useAutoCompactionSettings"; import { useSendMessageOptions } from "@/browser/hooks/useSendMessageOptions"; @@ -224,13 +224,21 @@ const AIViewInner: React.FC = ({ // We pass a default continueMessage of "Continue" as a resume sentinel so the backend can // auto-send it after compaction. The compaction prompt builder special-cases this sentinel // to avoid injecting it into the summarization request. + // Uses "force-compaction" source to distinguish from user-initiated /compact. const handleForceCompaction = useCallback(() => { if (!api) return; - void executeCompaction({ - api, + // Use compactHistory endpoint with interrupt to ensure immediate compaction + void api.workspace.compactHistory({ workspaceId, - sendMessageOptions: pendingSendOptions, + source: "force-compaction", + interrupt: "abandonPartial", continueMessage: { text: "Continue" }, + sendMessageOptions: { + model: pendingSendOptions.model, + thinkingLevel: pendingSendOptions.thinkingLevel, + providerOptions: pendingSendOptions.providerOptions, + experiments: pendingSendOptions.experiments, + }, }); }, [api, workspaceId, pendingSendOptions]); diff --git a/src/browser/hooks/useIdleCompactionHandler.ts b/src/browser/hooks/useIdleCompactionHandler.ts index 1873b9ab7f..16c07eca6e 100644 --- a/src/browser/hooks/useIdleCompactionHandler.ts +++ b/src/browser/hooks/useIdleCompactionHandler.ts @@ -4,18 +4,13 @@ * The backend's IdleCompactionService detects when workspaces have been idle * for a configured period and emits `idle-compaction-needed` events to the stream. * - * This hook listens for these signals and triggers compaction via the frontend's - * executeCompaction(), which handles gateway, model preferences, etc. - * - * Status display is handled data-driven: the compaction request message includes - * displayStatus metadata, which the aggregator reads to set sidebar status. - * Status is cleared when the summary message with compacted: "idle" arrives. + * This hook listens for these signals and triggers compaction via the control-plane + * compactHistory endpoint, which ensures the compaction cannot be dropped or queued. */ import { useEffect, useRef } from "react"; import type { RouterClient } from "@orpc/server"; import type { AppRouter } from "@/node/orpc/router"; -import { executeCompaction } from "@/browser/utils/chatCommands"; import { buildSendMessageOptions } from "@/browser/hooks/useSendMessageOptions"; import { workspaceStore } from "@/browser/stores/WorkspaceStore"; @@ -47,22 +42,33 @@ export function useIdleCompactionHandler(params: IdleCompactionHandlerParams): v // Use buildSendMessageOptions to get correct model, gateway, thinking level, etc. const sendMessageOptions = buildSendMessageOptions(workspaceId); - // Status is handled data-driven via displayStatus in the message metadata - void executeCompaction({ - api, - workspaceId, - sendMessageOptions, - source: "idle-compaction", - }).then((result) => { - if (!result.success) { - console.error("Idle compaction failed:", result.error); - } - // Always clear from triggered set after completion (success or failure). - // This allows the workspace to be re-triggered on subsequent hourly checks - // if it becomes idle again. Backend eligibility checks (already_compacted, - // currently_streaming) provide authoritative deduplication. - triggeredWorkspacesRef.current.delete(workspaceId); - }); + // Use control-plane compactHistory endpoint for reliability + void api.workspace + .compactHistory({ + workspaceId, + source: "idle-compaction", + sendMessageOptions: { + model: sendMessageOptions.model, + thinkingLevel: sendMessageOptions.thinkingLevel, + providerOptions: sendMessageOptions.providerOptions, + experiments: sendMessageOptions.experiments, + }, + }) + .then((result) => { + if (!result.success) { + console.error("Idle compaction failed:", result.error); + } + }) + .catch((error) => { + console.error("Idle compaction error:", error); + }) + .finally(() => { + // Always clear from triggered set after completion (success or failure). + // This allows the workspace to be re-triggered on subsequent hourly checks + // if it becomes idle again. Backend eligibility checks (already_compacted, + // currently_streaming) provide authoritative deduplication. + triggeredWorkspacesRef.current.delete(workspaceId); + }); }; const unsubscribe = workspaceStore.onIdleCompactionNeeded(handleIdleCompactionNeeded); diff --git a/src/browser/utils/chatCommands.ts b/src/browser/utils/chatCommands.ts index f7264c9528..c378d0950d 100644 --- a/src/browser/utils/chatCommands.ts +++ b/src/browser/utils/chatCommands.ts @@ -690,20 +690,44 @@ export function prepareCompactionMessage(options: CompactionOptions): { } /** - * Execute a compaction command + * Execute a compaction command via the control-plane endpoint. + * This ensures compaction cannot be dropped or treated as a normal message. */ export async function executeCompaction( options: CompactionOptions & { api: RouterClient } ): Promise { - const { messageText, metadata, sendOptions } = prepareCompactionMessage(options); + // Resolve compaction model preference + const effectiveModel = resolveCompactionModel(options.model); + + // Map source to control-plane format + const source: "user" | "force-compaction" | "idle-compaction" = + options.source === "idle-compaction" ? "idle-compaction" : "user"; + + // Build continue message if provided + const continueMode = options.continueMessage?.mode ?? "exec"; + const continueMessage = options.continueMessage + ? { + text: options.continueMessage.text, + imageParts: options.continueMessage.imageParts, + model: options.continueMessage.model ?? options.sendMessageOptions.model, + mode: continueMode, + } + : undefined; - const result = await options.api.workspace.sendMessage({ + // Call the control-plane compactHistory endpoint + const result = await options.api.workspace.compactHistory({ workspaceId: options.workspaceId, - message: messageText, - options: { - ...sendOptions, - muxMetadata: metadata, - editMessageId: options.editMessageId, + model: effectiveModel, + maxOutputTokens: options.maxOutputTokens, + continueMessage, + source, + // For edits, we need to interrupt and abandon partial + interrupt: options.editMessageId ? "abandonPartial" : undefined, + sendMessageOptions: { + model: options.sendMessageOptions.model, + thinkingLevel: options.sendMessageOptions.thinkingLevel, + providerOptions: options.sendMessageOptions.providerOptions, + experiments: options.sendMessageOptions.experiments, }, }); diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index 8228209a06..2ec38cd6ff 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -332,6 +332,45 @@ export const workspace = { }), output: ResultSchema(z.void(), z.string()), }, + /** + * Compact history: control-plane endpoint for compaction. + * Atomically interrupts any active stream (if requested) and starts compaction. + * Returns immediately after compaction starts; completion is signaled via chat-event. + */ + compactHistory: { + input: z.object({ + workspaceId: z.string(), + /** Model to use for compaction (defaults to workspace's current model) */ + model: z.string().optional(), + /** Max output tokens for compaction summary */ + maxOutputTokens: z.number().optional(), + /** Message to auto-send after compaction completes */ + continueMessage: z + .object({ + text: z.string(), + imageParts: z.array(ImagePartSchema).optional(), + model: z.string().optional(), + mode: z.enum(["exec", "plan"]).optional(), + }) + .optional(), + /** Source of the compaction request for telemetry/behavior */ + source: z.enum(["user", "force-compaction", "idle-compaction"]).optional(), + /** How to handle an active stream: none (fail if streaming), graceful (wait), abandonPartial (abort) */ + interrupt: z.enum(["none", "graceful", "abandonPartial"]).optional(), + /** Send message options (model, thinking level, etc.) for compaction stream */ + sendMessageOptions: SendMessageOptionsSchema.omit({ + editMessageId: true, + muxMetadata: true, + }).optional(), + }), + output: ResultSchema( + z.object({ + /** Unique ID for this compaction operation (can be used to correlate events) */ + operationId: z.string(), + }), + SendMessageErrorSchema + ), + }, replaceChatHistory: { input: z.object({ workspaceId: z.string(), diff --git a/src/node/orpc/router.ts b/src/node/orpc/router.ts index fd69479797..06dd5888dd 100644 --- a/src/node/orpc/router.ts +++ b/src/node/orpc/router.ts @@ -755,6 +755,23 @@ export const router = (authToken?: string) => { } return { success: true, data: undefined }; }), + compactHistory: t + .input(schemas.workspace.compactHistory.input) + .output(schemas.workspace.compactHistory.output) + .handler(async ({ context, input }) => { + const result = await context.workspaceService.compactHistory(input.workspaceId, { + model: input.model, + maxOutputTokens: input.maxOutputTokens, + continueMessage: input.continueMessage, + source: input.source, + interrupt: input.interrupt, + sendMessageOptions: input.sendMessageOptions, + }); + if (!result.success) { + return { success: false, error: result.error }; + } + return { success: true, data: result.data }; + }), replaceChatHistory: t .input(schemas.workspace.replaceChatHistory.input) .output(schemas.workspace.replaceChatHistory.output) diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index 0d5afcddec..4aeda8e894 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -40,6 +40,11 @@ import { TURNS_BETWEEN_ATTACHMENTS } from "@/common/constants/attachments"; import { extractEditedFileDiffs } from "@/common/utils/messages/extractEditedFiles"; import { isValidModelFormat } from "@/common/utils/ai/models"; import { modeToToolPolicy } from "@/common/utils/ui/modeUtils"; +import { + buildCompactionPrompt, + DEFAULT_COMPACTION_WORD_TARGET, + WORDS_TO_TOKENS_RATIO, +} from "@/common/constants/ui"; /** * Tracked file state for detecting external edits. @@ -142,6 +147,21 @@ export class AgentSession { */ private postCompactionContextEnabled = false; + /** + * Active compaction operation (control-plane initiated). + * Used by CompactionHandler to detect compaction completion instead of scanning history. + */ + private activeCompactionOperation: { + operationId: string; + source: "user" | "force-compaction" | "idle-compaction"; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; + } | null = null; + constructor(options: AgentSessionOptions) { assert(options, "AgentSession requires options"); const { @@ -178,6 +198,10 @@ export class AgentSession { partialService: this.partialService, emitter: this.emitter, onCompactionComplete, + getActiveCompactionOperation: () => this.activeCompactionOperation, + clearActiveCompactionOperation: () => { + this.activeCompactionOperation = null; + }, }); this.attachAiListeners(); @@ -576,6 +600,180 @@ export class AgentSession { return this.streamWithHistory(model, options); } + /** + * Start a compaction operation (control-plane initiated). + * Unlike sendMessage with muxMetadata, this does not persist a user message. + * Compaction completion is detected via session state, not history scanning. + */ + async startCompaction(options: { + operationId: string; + model?: string; + maxOutputTokens?: number; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; + source: "user" | "force-compaction" | "idle-compaction"; + sendMessageOptions?: Omit; + }): Promise> { + this.assertNotDisposed("startCompaction"); + + // Validate model if provided + const model = options.model ?? options.sendMessageOptions?.model; + if (!model || model.trim().length === 0) { + return Err( + createUnknownSendMessageError("No model specified for compaction. Please select a model.") + ); + } + if (!isValidModelFormat(model)) { + return Err({ + type: "invalid_model_string", + message: `Invalid model string format: "${model}". Expected "provider:model-id"`, + }); + } + + // Set active compaction operation (used by CompactionHandler) + this.activeCompactionOperation = { + operationId: options.operationId, + source: options.source, + continueMessage: options.continueMessage, + }; + + // Clean up background processes (they won't be in the summary) + await this.backgroundProcessManager.cleanup(this.workspaceId); + + if (this.disposed) { + this.activeCompactionOperation = null; + return Ok(undefined); + } + + // Queue continue message if provided + if (options.continueMessage) { + const { finalText, metadata } = prepareUserMessageForSend(options.continueMessage); + const continueMode = options.continueMessage.mode ?? "exec"; + const sanitizedOptions: Omit< + SendMessageOptions, + "muxMetadata" | "mode" | "editMessageId" | "imageParts" | "maxOutputTokens" + > & { + imageParts?: typeof options.continueMessage.imageParts; + muxMetadata?: typeof metadata; + } = { + model: options.continueMessage.model ?? model, + thinkingLevel: options.sendMessageOptions?.thinkingLevel, + toolPolicy: modeToToolPolicy(continueMode), + additionalSystemInstructions: options.sendMessageOptions?.additionalSystemInstructions, + providerOptions: options.sendMessageOptions?.providerOptions, + experiments: options.sendMessageOptions?.experiments, + }; + + if (options.continueMessage.imageParts?.length) { + sanitizedOptions.imageParts = options.continueMessage.imageParts; + } + if (metadata) { + sanitizedOptions.muxMetadata = metadata; + } + + this.messageQueue.add(finalText, sanitizedOptions); + this.emitQueuedMessageChanged(); + } + + // Build compaction prompt + const targetWords = options.maxOutputTokens + ? Math.round(options.maxOutputTokens / WORDS_TO_TOKENS_RATIO) + : DEFAULT_COMPACTION_WORD_TARGET; + let compactionPrompt = buildCompactionPrompt(targetWords); + + // If there's a non-default continue message, add context + if (options.continueMessage && options.continueMessage.text.trim() !== "Continue") { + compactionPrompt += `\n\nThe user wants to continue with: ${options.continueMessage.text}`; + } + + // Commit any pending partial + const commitResult = await this.partialService.commitToHistory(this.workspaceId); + if (!commitResult.success) { + this.activeCompactionOperation = null; + return Err(createUnknownSendMessageError(commitResult.error)); + } + + // Get history for compaction + const historyResult = await this.historyService.getHistory(this.workspaceId); + if (!historyResult.success) { + this.activeCompactionOperation = null; + return Err(createUnknownSendMessageError(historyResult.error)); + } + + if (historyResult.data.length === 0) { + this.activeCompactionOperation = null; + return Err(createUnknownSendMessageError("Cannot compact: workspace history is empty.")); + } + + // Build compaction messages (history + compaction prompt as user message) + // Tool policy for compaction: disable all tools + const disableAllTools = [{ regex_match: ".*", action: "disable" as const }]; + + const compactionUserMessage = createMuxMessage( + `compact-prompt-${options.operationId}`, + "user", + compactionPrompt, + { + timestamp: Date.now(), + toolPolicy: disableAllTools, + } + ); + + const compactionMessages = [...historyResult.data, compactionUserMessage]; + + // Enforce thinking policy + const effectiveThinkingLevel = options.sendMessageOptions?.thinkingLevel + ? enforceThinkingPolicy(model, options.sendMessageOptions.thinkingLevel) + : undefined; + + // Stream compaction (no history persistence for the prompt message) + return this.aiService.streamMessage( + compactionMessages, + this.workspaceId, + model, + effectiveThinkingLevel, + disableAllTools, + undefined, + undefined, + options.maxOutputTokens, + options.sendMessageOptions?.providerOptions, + undefined, + undefined, + undefined, + undefined, + options.sendMessageOptions?.experiments + ); + } + + /** + * Get the active compaction operation, if any. + * Used by CompactionHandler to detect compaction completion. + */ + getActiveCompactionOperation(): { + operationId: string; + source: "user" | "force-compaction" | "idle-compaction"; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; + } | null { + return this.activeCompactionOperation; + } + + /** + * Clear the active compaction operation. + * Called by CompactionHandler after compaction completes. + */ + clearActiveCompactionOperation(): void { + this.activeCompactionOperation = null; + } + async interruptStream(options?: { soft?: boolean; abandonPartial?: boolean; diff --git a/src/node/services/compactionHandler.test.ts b/src/node/services/compactionHandler.test.ts index 5a49aeb8e3..d942264637 100644 --- a/src/node/services/compactionHandler.test.ts +++ b/src/node/services/compactionHandler.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, beforeEach, mock } from "bun:test"; -import { CompactionHandler } from "./compactionHandler"; +import { CompactionHandler, type ActiveCompactionOperation } from "./compactionHandler"; import type { HistoryService } from "./historyService"; import type { PartialService } from "./partialService"; import type { EventEmitter } from "events"; @@ -22,30 +22,26 @@ interface ChatEventData { const createMockHistoryService = () => { let getHistoryResult: Result = Ok([]); - let clearHistoryResult: Result = Ok([]); - let appendToHistoryResult: Result = Ok(undefined); + let replaceHistoryResult: Result = Ok([]); const getHistory = mock((_) => Promise.resolve(getHistoryResult)); - const clearHistory = mock((_) => Promise.resolve(clearHistoryResult)); - const appendToHistory = mock((_, __) => Promise.resolve(appendToHistoryResult)); + const replaceHistory = mock((_, __) => Promise.resolve(replaceHistoryResult)); + + // Unused in compaction tests, but kept for interface compatibility const updateHistory = mock(() => Promise.resolve(Ok(undefined))); const truncateAfterMessage = mock(() => Promise.resolve(Ok(undefined))); return { getHistory, - clearHistory, - appendToHistory, + replaceHistory, updateHistory, truncateAfterMessage, // Allow setting mock return values mockGetHistory: (result: Result) => { getHistoryResult = result; }, - mockClearHistory: (result: Result) => { - clearHistoryResult = result; - }, - mockAppendToHistory: (result: Result) => { - appendToHistoryResult = result; + mockReplaceHistory: (result: Result) => { + replaceHistoryResult = result; }, }; }; @@ -81,12 +77,21 @@ const createMockEmitter = (): { emitter: EventEmitter; events: EmittedEvent[] } return { emitter: emitter as EventEmitter, events }; }; -const createCompactionRequest = (id = "req-1"): MuxMessage => - createMuxMessage(id, "user", "Please summarize the conversation", { +/** Helper: create a normal user message (not compaction) */ +const createNormalUserMessage = (id = "msg-1"): MuxMessage => + createMuxMessage(id, "user", "Hello, how are you?", { historySequence: 0, - muxMetadata: { type: "compaction-request", rawCommand: "/compact", parsed: {} }, + muxMetadata: { type: "normal" }, }); +/** Helper: create a valid long summary (>=50 words) */ +const createValidSummary = (): string => + "This is a comprehensive summary of the conversation. The user wanted to build a feature for their application. " + + "We discussed the requirements and architecture. Key decisions included using TypeScript for type safety, " + + "implementing a control-plane pattern for reliability, and adding validation for data integrity. " + + "The implementation is now complete with proper error handling and tests. " + + "Next steps involve deployment and monitoring of the new feature in production."; + const createStreamEndEvent = ( summary: string, metadata?: Record @@ -106,12 +111,11 @@ const createStreamEndEvent = ( // DRY helper to set up successful compaction scenario const setupSuccessfulCompaction = ( mockHistoryService: ReturnType, - messages: MuxMessage[] = [createCompactionRequest()], - clearedSequences?: number[] + messages: MuxMessage[] = [createNormalUserMessage()], + deletedSequences?: number[] ) => { mockHistoryService.mockGetHistory(Ok(messages)); - mockHistoryService.mockClearHistory(Ok(clearedSequences ?? messages.map((_, i) => i))); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + mockHistoryService.mockReplaceHistory(Ok(deletedSequences ?? messages.map((_, i) => i))); }; describe("CompactionHandler", () => { @@ -122,6 +126,8 @@ describe("CompactionHandler", () => { let telemetryCapture: ReturnType; let telemetryService: TelemetryService; let emittedEvents: EmittedEvent[]; + let activeOperation: ActiveCompactionOperation | null; + let clearActiveOperationCalled: boolean; const workspaceId = "test-workspace"; beforeEach(() => { @@ -137,44 +143,60 @@ describe("CompactionHandler", () => { mockHistoryService = createMockHistoryService(); mockPartialService = createMockPartialService(); + // Default: no active compaction operation + activeOperation = null; + clearActiveOperationCalled = false; + handler = new CompactionHandler({ workspaceId, historyService: mockHistoryService as unknown as HistoryService, telemetryService, partialService: mockPartialService as unknown as PartialService, emitter: mockEmitter, + getActiveCompactionOperation: () => activeOperation, + clearActiveCompactionOperation: () => { + clearActiveOperationCalled = true; + activeOperation = null; + }, }); }); - describe("handleCompletion() - Normal Compaction Flow", () => { - it("should return false when no compaction request found", async () => { - const normalMsg = createMuxMessage("msg1", "user", "Hello", { - historySequence: 0, - muxMetadata: { type: "normal" }, - }); - mockHistoryService.mockGetHistory(Ok([normalMsg])); + /** Helper to set an active compaction operation */ - const event = createStreamEndEvent("Summary"); - const result = await handler.handleCompletion(event); + const getReplaceHistoryCalls = (): Array<[string, MuxMessage[]]> => { + return mockHistoryService.replaceHistory.mock.calls as unknown as Array<[string, MuxMessage[]]>; + }; - expect(result).toBe(false); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(0); - }); + const getReplacedMessage = (): MuxMessage => { + const calls = getReplaceHistoryCalls(); + const [, messages] = calls[0]; + return messages[0]; + }; + const setActiveOperation = ( + operationId: string, + source: "user" | "force-compaction" | "idle-compaction" = "user" + ) => { + activeOperation = { operationId, source }; + }; - it("should return false when historyService fails", async () => { - mockHistoryService.mockGetHistory(Err("Database error")); + describe("handleCompletion() - Control-plane Compaction", () => { + it("should return false when no active compaction operation", async () => { + // No active operation set + const msg = createNormalUserMessage(); + mockHistoryService.mockGetHistory(Ok([msg])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); const result = await handler.handleCompletion(event); expect(result).toBe(false); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(0); }); it("should capture compaction_completed telemetry on successful compaction", async () => { - const compactionReq = createCompactionRequest(); - setupSuccessfulCompaction(mockHistoryService, [compactionReq]); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary", { + const event = createStreamEndEvent(createValidSummary(), { duration: 1500, // Prefer contextUsage (context size) over total usage. contextUsage: { inputTokens: 1000, outputTokens: 333, totalTokens: undefined }, @@ -202,102 +224,62 @@ describe("CompactionHandler", () => { }); it("should return true when successful", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Complete summary"); + const event = createStreamEndEvent(createValidSummary()); const result = await handler.handleCompletion(event); expect(result).toBe(true); + expect(clearActiveOperationCalled).toBe(true); }); - it("should join multiple text parts from event.parts", async () => { - const compactionReq = createCompactionRequest(); - setupSuccessfulCompaction(mockHistoryService, [compactionReq]); - - // Create event with multiple text parts - const event: StreamEndEvent = { - type: "stream-end", - workspaceId: "test-workspace", - messageId: "msg-id", - parts: [ - { type: "text", text: "Part 1 " }, - { type: "text", text: "Part 2 " }, - { type: "text", text: "Part 3" }, - ], - metadata: { - model: "claude-3-5-sonnet-20241022", - usage: { inputTokens: 100, outputTokens: 50, totalTokens: undefined }, - duration: 1500, - }, - }; - await handler.handleCompletion(event); - - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; - expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe( - "Part 1 Part 2 Part 3" - ); - }); - - it("should extract summary text from event.parts", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + it("should extract and store summary text from event.parts", async () => { + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("This is the summary"); + const validSummary = createValidSummary(); + const event = createStreamEndEvent(validSummary); await handler.handleCompletion(event); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; - expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe( - "This is the summary" - ); + const replacedMsg = getReplacedMessage(); + expect((replacedMsg.parts[0] as { type: "text"; text: string }).text).toBe(validSummary); }); it("should delete partial.json before clearing history (race condition fix)", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); // deletePartial should be called once before clearHistory expect(mockPartialService.deletePartial.mock.calls).toHaveLength(1); expect(mockPartialService.deletePartial.mock.calls[0][0]).toBe(workspaceId); - - // Verify deletePartial was called (we can't easily verify order without more complex mocking, - // but the important thing is that it IS called during compaction) }); - it("should call clearHistory() and appendToHistory()", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + it("should call replaceHistory()", async () => { + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary"); + const validSummary = createValidSummary(); + const event = createStreamEndEvent(validSummary); await handler.handleCompletion(event); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); - expect(mockHistoryService.clearHistory.mock.calls[0][0]).toBe(workspaceId); - expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); - expect(mockHistoryService.appendToHistory.mock.calls[0][0]).toBe(workspaceId); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; - expect(appendedMsg.role).toBe("assistant"); - expect((appendedMsg.parts[0] as { type: "text"; text: string }).text).toBe("Summary"); + const calls = getReplaceHistoryCalls(); + expect(calls).toHaveLength(1); + expect(calls[0][0]).toBe(workspaceId); + const replacedMsg = getReplacedMessage(); + expect(replacedMsg.role).toBe("assistant"); + expect((replacedMsg.parts[0] as { type: "text"; text: string }).text).toBe(validSummary); }); it("should emit delete event for old messages", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1, 2, 3])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + mockHistoryService.mockReplaceHistory(Ok([0, 1, 2, 3])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const deleteEvent = emittedEvents.find( @@ -309,13 +291,11 @@ describe("CompactionHandler", () => { }); it("should emit summary message with complete metadata", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); const usage = { inputTokens: 200, outputTokens: 100, totalTokens: 300 }; - const event = createStreamEndEvent("Summary", { + const event = createStreamEndEvent(createValidSummary(), { model: "claude-3-5-sonnet-20241022", usage, duration: 2000, @@ -342,12 +322,10 @@ describe("CompactionHandler", () => { }); it("should emit stream-end event to frontend", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary", { duration: 1234 }); + const event = createStreamEndEvent(createValidSummary(), { duration: 1234 }); await handler.handleCompletion(event); const streamEndEvent = emittedEvents.find((_e) => _e.data.message === event); @@ -358,57 +336,96 @@ describe("CompactionHandler", () => { }); it("should set compacted in summary metadata", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; - expect(appendedMsg.metadata?.compacted).toBe("user"); + const replacedMsg = getReplacedMessage(); + expect(replacedMsg.metadata?.compacted).toBe("user"); + }); + }); + + describe("handleCompletion() - Summary Validation", () => { + it("should reject empty summary and not clear history", async () => { + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + + const event = createStreamEndEvent(""); + const result = await handler.handleCompletion(event); + + expect(result).toBe(true); // Still returns true (was a compaction attempt) + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(0); + expect(clearActiveOperationCalled).toBe(true); + }); + + it("should reject summary that is too short (<50 words)", async () => { + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + + // Only ~10 words + const event = createStreamEndEvent( + "This is a very short summary that won't pass validation." + ); + const result = await handler.handleCompletion(event); + + expect(result).toBe(true); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(0); + expect(clearActiveOperationCalled).toBe(true); + }); + + it("should accept summary at minimum word count", async () => { + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); + + // Exactly 50 words + const fiftyWords = Array(50).fill("word").join(" "); + const event = createStreamEndEvent(fiftyWords); + const result = await handler.handleCompletion(event); + + expect(result).toBe(true); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); }); }); describe("handleCompletion() - Deduplication", () => { - it("should track processed compaction-request IDs", async () => { - const compactionReq = createCompactionRequest("req-unique"); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + it("should track processed operation IDs", async () => { + setActiveOperation("op-unique"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()], [0]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); }); - it("should return true without re-processing when same request ID seen twice", async () => { - const compactionReq = createCompactionRequest("req-dupe"); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + it("should return true without re-processing when same operation ID seen twice", async () => { + setActiveOperation("op-dupe"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()], [0]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); const result1 = await handler.handleCompletion(event); + + // Re-set operation since it was cleared + setActiveOperation("op-dupe"); const result2 = await handler.handleCompletion(event); expect(result1).toBe(true); expect(result2).toBe(true); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); }); it("should not emit duplicate events", async () => { - const compactionReq = createCompactionRequest("req-dupe-2"); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-dupe-2"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()], [0]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const eventCountAfterFirst = emittedEvents.length; + // Re-set operation since it was cleared + setActiveOperation("op-dupe-2"); await handler.handleCompletion(event); const eventCountAfterSecond = emittedEvents.length; @@ -416,77 +433,74 @@ describe("CompactionHandler", () => { }); it("should not clear history twice", async () => { - const compactionReq = createCompactionRequest("req-dupe-3"); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-dupe-3"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()], [0]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); + + // Re-set operation since it was cleared + setActiveOperation("op-dupe-3"); await handler.handleCompletion(event); - expect(mockHistoryService.clearHistory.mock.calls).toHaveLength(1); - expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(1); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); }); }); describe("Error Handling", () => { - it("should return false when clearHistory() fails", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Err("Clear failed")); + it("should return true but not replace history when replaceHistory() fails", async () => { + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + mockHistoryService.mockReplaceHistory(Err("Replace failed")); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); const result = await handler.handleCompletion(event); - expect(result).toBe(false); - expect(mockHistoryService.appendToHistory.mock.calls).toHaveLength(0); - }); - - it("should return false when appendToHistory() fails", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Err("Append failed")); - - const event = createStreamEndEvent("Summary"); - const result = await handler.handleCompletion(event); + // Returns true because it was a compaction attempt (even though it failed) + expect(result).toBe(true); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(1); + expect(clearActiveOperationCalled).toBe(true); - expect(result).toBe(false); + // Should not emit summary/delete events on failure + const summaryEvent = emittedEvents.find((_e) => { + const m = _e.data.message as MuxMessage | undefined; + return m?.role === "assistant" && m?.parts !== undefined; + }); + expect(summaryEvent).toBeUndefined(); }); it("should log errors but not throw", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Err("Database corruption")); + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + mockHistoryService.mockReplaceHistory(Err("Database corruption")); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); // Should not throw const result = await handler.handleCompletion(event); - expect(result).toBe(false); + expect(result).toBe(true); }); - it("should not emit events when compaction fails mid-process", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Err("Clear failed")); + it("should emit stream-end even when compaction fails", async () => { + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + mockHistoryService.mockReplaceHistory(Err("Replace failed")); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); - expect(emittedEvents).toHaveLength(0); + // stream-end should be emitted so UI updates + const streamEndEvent = emittedEvents.find((_e) => _e.data.message === event); + expect(streamEndEvent).toBeDefined(); }); }); describe("Event Emission", () => { it("should include workspaceId in all chat-event emissions", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const chatEvents = emittedEvents.filter((e) => e.event === "chat-event"); @@ -497,12 +511,11 @@ describe("CompactionHandler", () => { }); it("should emit DeleteMessage with correct type and historySequences array", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([5, 10, 15])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); + mockHistoryService.mockReplaceHistory(Ok([5, 10, 15])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const deleteEvent = emittedEvents.find( @@ -515,12 +528,11 @@ describe("CompactionHandler", () => { }); it("should emit summary message with proper MuxMessage structure", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary text"); + const validSummary = createValidSummary(); + const event = createStreamEndEvent(validSummary); await handler.handleCompletion(event); const summaryEvent = emittedEvents.find((_e) => { @@ -532,7 +544,7 @@ describe("CompactionHandler", () => { expect(summaryMsg).toMatchObject({ id: expect.stringContaining("summary-") as string, role: "assistant", - parts: [{ type: "text", text: "Summary text" }], + parts: [{ type: "text", text: validSummary }], metadata: expect.objectContaining({ compacted: "user", muxMetadata: { type: "normal" }, @@ -541,12 +553,10 @@ describe("CompactionHandler", () => { }); it("should forward stream events (stream-end, stream-abort) correctly", async () => { - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([compactionReq])); - mockHistoryService.mockClearHistory(Ok([0])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-1"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); - const event = createStreamEndEvent("Summary", { customField: "test" }); + const event = createStreamEndEvent(createValidSummary(), { customField: "test" }); await handler.handleCompletion(event); const streamEndEvent = emittedEvents.find((_e) => _e.data.message === event); @@ -563,21 +573,12 @@ describe("CompactionHandler", () => { timestamp: originalTimestamp, historySequence: 0, }); - const idleCompactionReq = createMuxMessage("req-1", "user", "Summarize", { - historySequence: 1, - muxMetadata: { - type: "compaction-request", - source: "idle-compaction", - rawCommand: "/compact", - parsed: {}, - }, - }); - mockHistoryService.mockGetHistory(Ok([userMessage, idleCompactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-idle", "idle-compaction"); + mockHistoryService.mockGetHistory(Ok([userMessage])); + mockHistoryService.mockReplaceHistory(Ok([0])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const summaryEvent = emittedEvents.find((_e) => { @@ -597,21 +598,12 @@ describe("CompactionHandler", () => { compacted: "user", historySequence: 0, }); - const idleCompactionReq = createMuxMessage("req-1", "user", "Summarize", { - historySequence: 1, - muxMetadata: { - type: "compaction-request", - source: "idle-compaction", - rawCommand: "/compact", - parsed: {}, - }, - }); - mockHistoryService.mockGetHistory(Ok([compactedMessage, idleCompactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-idle", "idle-compaction"); + mockHistoryService.mockGetHistory(Ok([compactedMessage])); + mockHistoryService.mockReplaceHistory(Ok([0])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const summaryEvent = emittedEvents.find((_e) => { @@ -635,21 +627,12 @@ describe("CompactionHandler", () => { timestamp: newerUserTimestamp, historySequence: 1, }); - const idleCompactionReq = createMuxMessage("req-1", "user", "Summarize", { - historySequence: 2, - muxMetadata: { - type: "compaction-request", - source: "idle-compaction", - rawCommand: "/compact", - parsed: {}, - }, - }); - mockHistoryService.mockGetHistory(Ok([compactedMessage, userMessage, idleCompactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1, 2])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + setActiveOperation("op-idle", "idle-compaction"); + mockHistoryService.mockGetHistory(Ok([compactedMessage, userMessage])); + mockHistoryService.mockReplaceHistory(Ok([0, 1])); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const summaryEvent = emittedEvents.find((_e) => { @@ -662,57 +645,20 @@ describe("CompactionHandler", () => { expect(summaryMsg.metadata?.timestamp).toBe(newerUserTimestamp); }); - it("should skip compaction-request message when finding timestamp to preserve", async () => { - const originalTimestamp = Date.now() - 3600 * 1000; // 1 hour ago - the real user message - const freshTimestamp = Date.now(); // The compaction request has a fresh timestamp - const userMessage = createMuxMessage("user-1", "user", "Hello", { - timestamp: originalTimestamp, - historySequence: 0, - }); - // Idle compaction request WITH a timestamp (as happens in production) - const idleCompactionReq = createMuxMessage("req-1", "user", "Summarize", { - timestamp: freshTimestamp, - historySequence: 1, - muxMetadata: { - type: "compaction-request", - source: "idle-compaction", - rawCommand: "/compact", - parsed: {}, - }, - }); - - mockHistoryService.mockGetHistory(Ok([userMessage, idleCompactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); - - const event = createStreamEndEvent("Summary"); - await handler.handleCompletion(event); - - const summaryEvent = emittedEvents.find((_e) => { - const m = _e.data.message as MuxMessage | undefined; - return m?.role === "assistant" && m?.metadata?.compacted; - }); - expect(summaryEvent).toBeDefined(); - const summaryMsg = summaryEvent?.data.message as MuxMessage; - // Should use the OLD user message timestamp, NOT the fresh compaction request timestamp - expect(summaryMsg.metadata?.timestamp).toBe(originalTimestamp); - expect(summaryMsg.metadata?.compacted).toBe("idle"); - }); - - it("should use current time for non-idle compaction", async () => { + it("should use current time for non-idle compaction (user source)", async () => { const oldTimestamp = Date.now() - 3600 * 1000; // 1 hour ago const userMessage = createMuxMessage("user-1", "user", "Hello", { timestamp: oldTimestamp, historySequence: 0, }); - // Regular compaction (not idle) - const compactionReq = createCompactionRequest(); - mockHistoryService.mockGetHistory(Ok([userMessage, compactionReq])); - mockHistoryService.mockClearHistory(Ok([0, 1])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); + + // Regular compaction (not idle) - uses "user" source + setActiveOperation("op-user", "user"); + mockHistoryService.mockGetHistory(Ok([userMessage])); + mockHistoryService.mockReplaceHistory(Ok([0])); const beforeTime = Date.now(); - const event = createStreamEndEvent("Summary"); + const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); const afterTime = Date.now(); diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index 62cbeeae51..e1240d6743 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -3,7 +3,7 @@ import type { HistoryService } from "./historyService"; import type { PartialService } from "./partialService"; import type { StreamEndEvent } from "@/common/types/stream"; -import type { WorkspaceChatMessage, DeleteMessage } from "@/common/orpc/types"; +import type { WorkspaceChatMessage, DeleteMessage, ImagePart } from "@/common/orpc/types"; import type { Result } from "@/common/types/result"; import { Ok, Err } from "@/common/types/result"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; @@ -18,6 +18,21 @@ import { } from "@/common/utils/messages/extractEditedFiles"; import { computeRecencyFromMessages } from "@/common/utils/recency"; +/** Minimum word count for a valid compaction summary */ +const MIN_SUMMARY_WORDS = 50; + +/** Active compaction operation tracked via session state (control-plane) */ +export interface ActiveCompactionOperation { + operationId: string; + source: "user" | "force-compaction" | "idle-compaction"; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; +} + interface CompactionHandlerOptions { workspaceId: string; historyService: HistoryService; @@ -26,15 +41,24 @@ interface CompactionHandlerOptions { emitter: EventEmitter; /** Called when compaction completes successfully (e.g., to clear idle compaction pending state) */ onCompactionComplete?: () => void; + /** Get active compaction operation from session state (control-plane) */ + getActiveCompactionOperation: () => ActiveCompactionOperation | null; + /** Clear active compaction operation after completion */ + clearActiveCompactionOperation: () => void; } /** * Handles history compaction for agent sessions * * Responsible for: - * - Detecting compaction requests in stream events - * - Replacing chat history with compacted summaries + * - Detecting compaction operations via session state + * - Replacing chat history with compacted summaries (only on successful completion) * - Preserving cumulative usage across compactions + * + * IMPORTANT: History is only replaced when: + * 1. An active compaction operation exists in session state + * 2. The stream completed successfully (stream-end, not stream-abort/error) + * 3. The summary text is valid (non-empty, meets minimum length) */ export class CompactionHandler { private readonly workspaceId: string; @@ -44,6 +68,8 @@ export class CompactionHandler { private readonly emitter: EventEmitter; private readonly processedCompactionRequestIds: Set = new Set(); private readonly onCompactionComplete?: () => void; + private readonly getActiveCompactionOperation: () => ActiveCompactionOperation | null; + private readonly clearActiveCompactionOperation: () => void; /** Flag indicating post-compaction attachments should be generated on next turn */ private postCompactionAttachmentsPending = false; @@ -57,6 +83,8 @@ export class CompactionHandler { this.telemetryService = options.telemetryService; this.emitter = options.emitter; this.onCompactionComplete = options.onCompactionComplete; + this.getActiveCompactionOperation = options.getActiveCompactionOperation; + this.clearActiveCompactionOperation = options.clearActiveCompactionOperation; } /** @@ -86,55 +114,113 @@ export class CompactionHandler { } /** - * Handle compaction stream completion + * Handle compaction stream completion. + * + * Only processes compaction if there's an active operation in session state. + * This ensures compaction can only be triggered via the control-plane + * (compactHistory endpoint), not by user messages with special metadata. * - * Detects when a compaction stream finishes, extracts the summary, - * and performs history replacement atomically. + * @returns true if this was a compaction stream, false otherwise */ async handleCompletion(event: StreamEndEvent): Promise { - // Check if the last user message is a compaction-request - const historyResult = await this.historyService.getHistory(this.workspaceId); - if (!historyResult.success) { - return false; - } - - const messages = historyResult.data; - const lastUserMsg = [...messages].reverse().find((m) => m.role === "user"); - const isCompaction = lastUserMsg?.metadata?.muxMetadata?.type === "compaction-request"; - - if (!isCompaction || !lastUserMsg) { + const activeOperation = this.getActiveCompactionOperation(); + if (!activeOperation) { + // No active compaction - this is a normal stream completion return false; } - // Dedupe: If we've already processed this compaction-request, skip - if (this.processedCompactionRequestIds.has(lastUserMsg.id)) { + // Dedupe by operationId (prevents double-processing on reconnect) + if (this.processedCompactionRequestIds.has(activeOperation.operationId)) { + log.debug("Skipping already-processed compaction operation", { + operationId: activeOperation.operationId, + }); return true; } + // Extract summary text from stream parts const summary = event.parts .filter((part): part is { type: "text"; text: string } => part.type === "text") .map((part) => part.text) .join(""); - // Check if this was an idle-compaction (auto-triggered due to inactivity) - const muxMeta = lastUserMsg.metadata?.muxMetadata; - const isIdleCompaction = - muxMeta?.type === "compaction-request" && muxMeta.source === "idle-compaction"; + // Validate summary before replacing history + const validationResult = this.validateSummary(summary); + if (!validationResult.valid) { + log.error("Compaction summary validation failed:", { + operationId: activeOperation.operationId, + reason: validationResult.reason, + summaryLength: summary.length, + }); + this.clearActiveCompactionOperation(); + // Emit stream-end so UI updates, but history remains unchanged + this.emitChatEvent(event); + return true; + } + + // Get current history for compaction + const historyResult = await this.historyService.getHistory(this.workspaceId); + if (!historyResult.success) { + log.error("Failed to get history for compaction:", historyResult.error); + this.clearActiveCompactionOperation(); + this.emitChatEvent(event); + return true; + } // Mark as processed before performing compaction - this.processedCompactionRequestIds.add(lastUserMsg.id); + this.processedCompactionRequestIds.add(activeOperation.operationId); + const isIdleCompaction = activeOperation.source === "idle-compaction"; const result = await this.performCompaction( summary, event.metadata, - messages, + historyResult.data, isIdleCompaction ); + if (!result.success) { - log.error("Compaction failed:", result.error); - return false; + log.error("Compaction failed:", { + operationId: activeOperation.operationId, + error: result.error, + }); + this.clearActiveCompactionOperation(); + this.emitChatEvent(event); + return true; + } + + // Success - capture telemetry and notify + this.captureCompactionTelemetry(event, isIdleCompaction ? "idle" : "manual"); + this.clearActiveCompactionOperation(); + this.onCompactionComplete?.(); + + // Emit stream-end so UI knows compaction completed + this.emitChatEvent(event); + return true; + } + + /** + * Validate that a summary is suitable for replacing history. + * Prevents data loss from empty or truncated summaries. + */ + private validateSummary(summary: string): { valid: true } | { valid: false; reason: string } { + if (!summary || summary.trim().length === 0) { + return { valid: false, reason: "Summary is empty" }; + } + + const wordCount = summary.trim().split(/\s+/).length; + if (wordCount < MIN_SUMMARY_WORDS) { + return { + valid: false, + reason: `Summary too short: ${wordCount} words (minimum: ${MIN_SUMMARY_WORDS})`, + }; } + return { valid: true }; + } + + /** + * Capture telemetry for compaction completion + */ + private captureCompactionTelemetry(event: StreamEndEvent, source: "idle" | "manual"): void { const durationSecs = typeof event.metadata.duration === "number" ? event.metadata.duration / 1000 : 0; const inputTokens = @@ -149,16 +235,9 @@ export class CompactionHandler { duration_b2: roundToBase2(durationSecs), input_tokens_b2: roundToBase2(inputTokens ?? 0), output_tokens_b2: roundToBase2(outputTokens ?? 0), - compaction_source: isIdleCompaction ? "idle" : "manual", + compaction_source: source, }, }); - - // Notify that compaction completed (clears idle compaction pending state) - this.onCompactionComplete?.(); - - // Emit stream-end to frontend so UI knows compaction is complete - this.emitChatEvent(event); - return true; } /** @@ -197,13 +276,6 @@ export class CompactionHandler { // Extract diffs BEFORE clearing history (they'll be gone after clear) this.cachedFileDiffs = extractEditedFileDiffs(messages); - // Clear entire history and get deleted sequences - const clearResult = await this.historyService.clearHistory(this.workspaceId); - if (!clearResult.success) { - return Err(`Failed to clear history: ${clearResult.error}`); - } - const deletedSequences = clearResult.data; - // For idle compaction, preserve the original recency timestamp so the workspace // doesn't appear "recently used" in the sidebar. Use the shared recency utility // to ensure consistency with how the sidebar computes recency. @@ -237,14 +309,15 @@ export class CompactionHandler { } ); - // Append summary to history - const appendResult = await this.historyService.appendToHistory( - this.workspaceId, - summaryMessage - ); - if (!appendResult.success) { - return Err(`Failed to append summary: ${appendResult.error}`); + // Atomically replace history with the single summary message. + // This avoids the "delete then crash" failure mode. + const replaceResult = await this.historyService.replaceHistory(this.workspaceId, [ + summaryMessage, + ]); + if (!replaceResult.success) { + return Err(`Failed to replace history: ${replaceResult.error}`); } + const deletedSequences = replaceResult.data; // Set flag to trigger post-compaction attachment injection on next turn this.postCompactionAttachmentsPending = true; diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index fbb86a6b80..531c9020d9 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -422,6 +422,60 @@ export class HistoryService { }); } + /** + * Atomically replace the entire chat history with the provided messages. + * + * This is the preferred primitive for compaction: it guarantees we never delete history + * and then crash before writing the replacement. The underlying write is atomic. + * + * @returns Result containing array of deleted historySequence numbers + */ + async replaceHistory( + workspaceId: string, + newMessages: MuxMessage[] + ): Promise> { + return this.fileLocks.withLock(workspaceId, async () => { + try { + const historyPath = this.getChatHistoryPath(workspaceId); + const workspaceDir = this.config.getSessionDir(workspaceId); + await fs.mkdir(workspaceDir, { recursive: true }); + + const historyResult = await this.getHistory(workspaceId); + const deletedSequences = historyResult.success + ? historyResult.data + .map((msg) => msg.metadata?.historySequence ?? -1) + .filter((s) => s >= 0) + : []; + + // Normalize sequence numbers to start at 0 + const normalizedMessages = newMessages.map((msg, index) => { + return { + ...msg, + metadata: { + ...(msg.metadata ?? {}), + historySequence: index, + }, + }; + }); + + const historyEntries = normalizedMessages + .map((msg) => JSON.stringify({ ...msg, workspaceId }) + "\n") + .join(""); + + // Atomic write prevents corruption if app crashes mid-write. + // If historyEntries is empty, this creates/overwrites the file with an empty payload. + await writeFileAtomic(historyPath, historyEntries); + + // Update sequence counter to continue from the end + this.sequenceCounters.set(workspaceId, normalizedMessages.length); + + return Ok(deletedSequences); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + return Err(`Failed to replace history: ${message}`); + } + }); + } async clearHistory(workspaceId: string): Promise> { const result = await this.truncateHistory(workspaceId, 1.0); if (!result.success) { diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 9f8303f288..cbb63cb951 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -1873,6 +1873,110 @@ export class WorkspaceService extends EventEmitter { return Ok(undefined); } + /** + * Control-plane endpoint for compaction. + * Atomically interrupts any active stream (if requested) and starts compaction. + * + * Unlike sendMessage with muxMetadata, this: + * - Does not persist a user message with the compaction prompt + * - Uses session state to track compaction operation + * - Provides explicit operationId for correlation + */ + async compactHistory( + workspaceId: string, + options: { + model?: string; + maxOutputTokens?: number; + continueMessage?: { + text: string; + imageParts?: ImagePart[]; + model?: string; + mode?: "exec" | "plan"; + }; + source?: "user" | "force-compaction" | "idle-compaction"; + interrupt?: "none" | "graceful" | "abandonPartial"; + sendMessageOptions?: Omit; + } + ): Promise> { + const operationId = `compact-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`; + const source = options.source ?? "user"; + const interrupt = options.interrupt ?? "none"; + + log.debug("compactHistory: starting", { + workspaceId, + operationId, + source, + interrupt, + hasModel: !!options.model, + hasContinueMessage: !!options.continueMessage, + }); + + try { + // Block operations on workspaces being renamed/removed + if (this.renamingWorkspaces.has(workspaceId)) { + return Err({ + type: "unknown", + raw: "Workspace is being renamed. Please wait and try again.", + }); + } + if (this.removingWorkspaces.has(workspaceId)) { + return Err({ + type: "unknown", + raw: "Workspace is being deleted. Please wait and try again.", + }); + } + if (!this.config.findWorkspace(workspaceId)) { + return Err({ type: "unknown", raw: "Workspace not found. It may have been deleted." }); + } + + const session = this.getOrCreateSession(workspaceId); + + // Handle active stream based on interrupt mode + if (this.aiService.isStreaming(workspaceId)) { + switch (interrupt) { + case "none": + return Err({ + type: "unknown", + raw: "Cannot compact while stream is active. Use interrupt option or wait for stream to complete.", + }); + case "graceful": + // Wait for stream to complete (soft interrupt) + await session.interruptStream({ soft: true }); + break; + case "abandonPartial": + // Abort immediately + await session.interruptStream({ abandonPartial: true }); + break; + } + } + + // Start compaction via session + const result = await session.startCompaction({ + operationId, + model: options.model, + maxOutputTokens: options.maxOutputTokens, + continueMessage: options.continueMessage, + source, + sendMessageOptions: options.sendMessageOptions, + }); + + if (!result.success) { + return result; + } + + // Skip recency update for idle compaction to preserve "last used" time + if (source !== "idle-compaction") { + void this.updateRecencyTimestamp(workspaceId, Date.now()); + } + + return Ok({ operationId }); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : JSON.stringify(error, null, 2); + log.error("Unexpected error in compactHistory:", error); + return Err({ type: "unknown", raw: `Failed to compact history: ${errorMessage}` }); + } + } + async replaceHistory( workspaceId: string, summaryMessage: MuxMessage, From 496991ee23628c1800d2ff0e1f0730c698282e8b Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 13:57:50 -0600 Subject: [PATCH 2/9] =?UTF-8?q?=F0=9F=A4=96=20fix:=20bind=20compaction=20t?= =?UTF-8?q?o=20stream=20messageId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/node/services/agentSession.ts | 37 +++++++++++++++++++-- src/node/services/compactionHandler.test.ts | 17 ++++++++-- src/node/services/compactionHandler.ts | 9 +++++ 3 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index 4aeda8e894..41a3638b18 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -153,6 +153,8 @@ export class AgentSession { */ private activeCompactionOperation: { operationId: string; + /** Stream messageId for the compaction summary stream (set from stream-start) */ + streamMessageId: string | null; source: "user" | "force-compaction" | "idle-compaction"; continueMessage?: { text: string; @@ -637,6 +639,7 @@ export class AgentSession { // Set active compaction operation (used by CompactionHandler) this.activeCompactionOperation = { operationId: options.operationId, + streamMessageId: null, source: options.source, continueMessage: options.continueMessage, }; @@ -755,6 +758,7 @@ export class AgentSession { */ getActiveCompactionOperation(): { operationId: string; + streamMessageId: string | null; source: "user" | "force-compaction" | "idle-compaction"; continueMessage?: { text: string; @@ -887,7 +891,16 @@ export class AgentSession { this.aiService.on(event, wrapped as never); }; - forward("stream-start", (payload) => this.emitChatEvent(payload)); + forward("stream-start", (payload) => { + // Bind the active compaction operation to the specific stream messageId. + // This prevents unrelated streams from being misinterpreted as compaction. + const activeOperation = this.activeCompactionOperation; + if (activeOperation?.streamMessageId === null && payload.type === "stream-start") { + activeOperation.streamMessageId = payload.messageId; + } + + this.emitChatEvent(payload); + }); forward("stream-delta", (payload) => this.emitChatEvent(payload)); forward("tool-call-start", (payload) => this.emitChatEvent(payload)); forward("bash-output", (payload) => this.emitChatEvent(payload)); @@ -912,7 +925,18 @@ export class AgentSession { forward("reasoning-delta", (payload) => this.emitChatEvent(payload)); forward("reasoning-end", (payload) => this.emitChatEvent(payload)); forward("usage-delta", (payload) => this.emitChatEvent(payload)); - forward("stream-abort", (payload) => this.emitChatEvent(payload)); + forward("stream-abort", (payload) => { + // If the compaction stream was aborted, clear the active compaction operation so + // subsequent stream-end events cannot replace history. + if ( + this.activeCompactionOperation && + payload.type === "stream-abort" && + this.activeCompactionOperation.streamMessageId === payload.messageId + ) { + this.activeCompactionOperation = null; + } + this.emitChatEvent(payload); + }); forward("stream-end", async (payload) => { const handled = await this.compactionHandler.handleCompletion(payload as StreamEndEvent); @@ -943,6 +967,15 @@ export class AgentSession { error: string; errorType?: string; }; + // If the compaction stream errored, clear the active compaction operation so + // subsequent stream-end events cannot replace history. + if ( + this.activeCompactionOperation && + this.activeCompactionOperation.streamMessageId === data.messageId + ) { + this.activeCompactionOperation = null; + } + const streamError: StreamErrorMessage = { type: "stream-error", messageId: data.messageId, diff --git a/src/node/services/compactionHandler.test.ts b/src/node/services/compactionHandler.test.ts index d942264637..ae5473cc9c 100644 --- a/src/node/services/compactionHandler.test.ts +++ b/src/node/services/compactionHandler.test.ts @@ -174,12 +174,25 @@ describe("CompactionHandler", () => { }; const setActiveOperation = ( operationId: string, - source: "user" | "force-compaction" | "idle-compaction" = "user" + source: "user" | "force-compaction" | "idle-compaction" = "user", + streamMessageId: string | null = "msg-id" ) => { - activeOperation = { operationId, source }; + activeOperation = { operationId, streamMessageId, source }; }; describe("handleCompletion() - Control-plane Compaction", () => { + it("should ignore stream-end events that do not match the compaction stream messageId", async () => { + // Active operation, but bound to a different stream + setActiveOperation("op-1", "user", "different-message-id"); + setupSuccessfulCompaction(mockHistoryService, [createNormalUserMessage()]); + + const event = createStreamEndEvent(createValidSummary()); + const result = await handler.handleCompletion(event); + + // Not treated as compaction + expect(result).toBe(false); + expect(mockHistoryService.replaceHistory.mock.calls).toHaveLength(0); + }); it("should return false when no active compaction operation", async () => { // No active operation set const msg = createNormalUserMessage(); diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index e1240d6743..9ae33b2903 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -24,6 +24,8 @@ const MIN_SUMMARY_WORDS = 50; /** Active compaction operation tracked via session state (control-plane) */ export interface ActiveCompactionOperation { operationId: string; + /** Stream messageId for the compaction summary stream (set from stream-start) */ + streamMessageId: string | null; source: "user" | "force-compaction" | "idle-compaction"; continueMessage?: { text: string; @@ -129,6 +131,13 @@ export class CompactionHandler { return false; } + // Only treat this stream-end as compaction if it belongs to the compaction stream. + // This prevents unrelated stream-end events (e.g., a previous stream finishing after a + // graceful stop) from replacing history. + if (activeOperation.streamMessageId !== event.messageId) { + return false; + } + // Dedupe by operationId (prevents double-processing on reconnect) if (this.processedCompactionRequestIds.has(activeOperation.operationId)) { log.debug("Skipping already-processed compaction operation", { From 6990ff61da46633f2b698c4f170755bb28c4dcbd Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 14:09:35 -0600 Subject: [PATCH 3/9] =?UTF-8?q?=F0=9F=A4=96=20tests:=20update=20idle=20com?= =?UTF-8?q?paction=20handler=20hook=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hooks/useIdleCompactionHandler.test.ts | 103 +++++++++--------- 1 file changed, 50 insertions(+), 53 deletions(-) diff --git a/src/browser/hooks/useIdleCompactionHandler.test.ts b/src/browser/hooks/useIdleCompactionHandler.test.ts index 9e6aa4f554..05fc09355f 100644 --- a/src/browser/hooks/useIdleCompactionHandler.test.ts +++ b/src/browser/hooks/useIdleCompactionHandler.test.ts @@ -31,51 +31,27 @@ void mock.module("@/browser/stores/WorkspaceStore", () => ({ void mock.module("@/browser/hooks/useSendMessageOptions", () => ({ buildSendMessageOptions: () => ({ model: "test-model", - gateway: "anthropic", + thinkingLevel: undefined, + providerOptions: undefined, + experiments: undefined, }), })); -// Mock executeCompaction - tracks calls and can be configured per test -let executeCompactionCalls: Array<{ - api: unknown; - workspaceId: string; - sendMessageOptions: unknown; - source: string; -}> = []; -let executeCompactionResult: { success: true } | { success: false; error: string } = { +// Mock workspace.compactHistory - tracks calls and can be configured per test +let compactHistoryResolver: ((value: unknown) => void) | null = null; +let compactHistoryResult: + | { success: true; data: { operationId: string } } + | { success: false; error: unknown } = { success: true, + data: { operationId: "op-1" }, }; -let executeCompactionResolver: - | ((value: { success: true } | { success: false; error: string }) => void) - | null = null; - -void mock.module("@/browser/utils/chatCommands", () => ({ - executeCompaction: (opts: { - api: unknown; - workspaceId: string; - sendMessageOptions: unknown; - source: string; - }) => { - executeCompactionCalls.push(opts); - if (executeCompactionResolver) { - // Return a promise that hangs until manually resolved - return new Promise((resolve) => { - const savedResolver = executeCompactionResolver; - executeCompactionResolver = (val) => { - savedResolver?.(val); - resolve(val); - }; - }); - } - return Promise.resolve(executeCompactionResult); - }, -})); // Import after mocks are set up import { useIdleCompactionHandler } from "./useIdleCompactionHandler"; describe("useIdleCompactionHandler", () => { let mockApi: object; + let compactHistoryMock: ReturnType; let unsubscribeCalled: boolean; beforeEach(() => { @@ -83,16 +59,29 @@ describe("useIdleCompactionHandler", () => { globalThis.window = new GlobalWindow() as unknown as Window & typeof globalThis; globalThis.document = globalThis.window.document; - mockApi = { workspace: { sendMessage: mock() } }; + compactHistoryMock = mock((_args: unknown) => { + if (compactHistoryResolver) { + // Return a promise that hangs until manually resolved + return new Promise((resolve) => { + const savedResolver = compactHistoryResolver; + compactHistoryResolver = (val) => { + savedResolver?.(val); + resolve(val); + }; + }); + } + return Promise.resolve(compactHistoryResult); + }); + + mockApi = { workspace: { compactHistory: compactHistoryMock } }; unsubscribeCalled = false; mockUnsubscribe = () => { unsubscribeCalled = true; }; capturedCallback = null; onIdleCompactionNeededCallCount = 0; - executeCompactionCalls = []; - executeCompactionResult = { success: true }; - executeCompactionResolver = null; + compactHistoryResult = { success: true, data: { operationId: "op-1" } }; + compactHistoryResolver = null; }); afterEach(() => { @@ -122,7 +111,7 @@ describe("useIdleCompactionHandler", () => { expect(onIdleCompactionNeededCallCount).toBe(0); }); - test("calls executeCompaction when event received", async () => { + test("calls workspace.compactHistory when event received", async () => { renderHook(() => useIdleCompactionHandler({ api: mockApi as never })); expect(capturedCallback).not.toBeNull(); @@ -130,20 +119,25 @@ describe("useIdleCompactionHandler", () => { // Wait for async execution await Promise.resolve(); + await Promise.resolve(); // Extra tick for .then() - expect(executeCompactionCalls).toHaveLength(1); - expect(executeCompactionCalls[0]).toEqual({ - api: mockApi, + expect(compactHistoryMock.mock.calls).toHaveLength(1); + expect(compactHistoryMock.mock.calls[0][0]).toEqual({ workspaceId: "workspace-123", - sendMessageOptions: { model: "test-model", gateway: "anthropic" }, source: "idle-compaction", + sendMessageOptions: { + model: "test-model", + thinkingLevel: undefined, + providerOptions: undefined, + experiments: undefined, + }, }); }); test("prevents duplicate triggers for same workspace while in-flight", async () => { - // Make executeCompaction hang until we resolve it - this no-op will be replaced when promise is created + // Make compactHistory hang until we resolve it - this no-op will be replaced when promise is created // eslint-disable-next-line @typescript-eslint/no-empty-function - executeCompactionResolver = () => {}; + compactHistoryResolver = () => {}; renderHook(() => useIdleCompactionHandler({ api: mockApi as never })); @@ -156,11 +150,12 @@ describe("useIdleCompactionHandler", () => { await Promise.resolve(); // Should only have called once - expect(executeCompactionCalls).toHaveLength(1); + expect(compactHistoryMock.mock.calls).toHaveLength(1); // Resolve the first compaction - executeCompactionResolver({ success: true }); + compactHistoryResolver({ success: true, data: { operationId: "op-1" } }); await Promise.resolve(); + await Promise.resolve(); // Extra tick for .finally() }); test("allows different workspaces to compact simultaneously", async () => { @@ -170,7 +165,7 @@ describe("useIdleCompactionHandler", () => { capturedCallback!("workspace-2"); await Promise.resolve(); - expect(executeCompactionCalls).toHaveLength(2); + expect(compactHistoryMock.mock.calls).toHaveLength(2); }); test("clears workspace from triggered set after success", async () => { @@ -181,18 +176,19 @@ describe("useIdleCompactionHandler", () => { await Promise.resolve(); await Promise.resolve(); // Extra tick for .then() - expect(executeCompactionCalls).toHaveLength(1); + expect(compactHistoryMock.mock.calls).toHaveLength(1); + await Promise.resolve(); // Extra tick for .finally() // Should be able to trigger again after completion capturedCallback!("workspace-123"); await Promise.resolve(); - expect(executeCompactionCalls).toHaveLength(2); + expect(compactHistoryMock.mock.calls).toHaveLength(2); }); test("clears workspace from triggered set after failure", async () => { // Make first call fail - executeCompactionResult = { success: false, error: "test error" }; + compactHistoryResult = { success: false, error: "test error" }; // Suppress console.error for this test const originalError = console.error; @@ -205,13 +201,14 @@ describe("useIdleCompactionHandler", () => { await Promise.resolve(); await Promise.resolve(); // Extra tick for .then() - expect(executeCompactionCalls).toHaveLength(1); + expect(compactHistoryMock.mock.calls).toHaveLength(1); + await Promise.resolve(); // Extra tick for .finally() // Should be able to trigger again after failure capturedCallback!("workspace-123"); await Promise.resolve(); - expect(executeCompactionCalls).toHaveLength(2); + expect(compactHistoryMock.mock.calls).toHaveLength(2); console.error = originalError; }); From d1d15f5c0f7dcbb9c6642c950e2d944ecb1488fd Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 14:30:09 -0600 Subject: [PATCH 4/9] =?UTF-8?q?=F0=9F=A4=96=20fix:=20preserve=20partial=20?= =?UTF-8?q?output=20when=20interrupting=20for=20compaction?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/browser/components/AIView.tsx | 2 +- src/browser/utils/chatCommands.ts | 5 +++-- src/common/orpc/schemas/api.ts | 4 ++-- src/node/services/workspaceService.ts | 10 ++++++---- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/browser/components/AIView.tsx b/src/browser/components/AIView.tsx index fc691d537e..0a0cd20b38 100644 --- a/src/browser/components/AIView.tsx +++ b/src/browser/components/AIView.tsx @@ -231,7 +231,7 @@ const AIViewInner: React.FC = ({ void api.workspace.compactHistory({ workspaceId, source: "force-compaction", - interrupt: "abandonPartial", + interrupt: "abort", continueMessage: { text: "Continue" }, sendMessageOptions: { model: pendingSendOptions.model, diff --git a/src/browser/utils/chatCommands.ts b/src/browser/utils/chatCommands.ts index c378d0950d..e6b3529820 100644 --- a/src/browser/utils/chatCommands.ts +++ b/src/browser/utils/chatCommands.ts @@ -721,8 +721,9 @@ export async function executeCompaction( maxOutputTokens: options.maxOutputTokens, continueMessage, source, - // For edits, we need to interrupt and abandon partial - interrupt: options.editMessageId ? "abandonPartial" : undefined, + // For edits, interrupt any active stream before compacting. + // We preserve partial output; compaction will commit partial.json before summarizing. + interrupt: options.editMessageId ? "abort" : undefined, sendMessageOptions: { model: options.sendMessageOptions.model, thinkingLevel: options.sendMessageOptions.thinkingLevel, diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index 2ec38cd6ff..55b46f4c56 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -355,8 +355,8 @@ export const workspace = { .optional(), /** Source of the compaction request for telemetry/behavior */ source: z.enum(["user", "force-compaction", "idle-compaction"]).optional(), - /** How to handle an active stream: none (fail if streaming), graceful (wait), abandonPartial (abort) */ - interrupt: z.enum(["none", "graceful", "abandonPartial"]).optional(), + /** How to handle an active stream: none (fail if streaming), graceful (wait), abort (interrupt immediately) */ + interrupt: z.enum(["none", "graceful", "abort"]).optional(), /** Send message options (model, thinking level, etc.) for compaction stream */ sendMessageOptions: SendMessageOptionsSchema.omit({ editMessageId: true, diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index cbb63cb951..3f0524e186 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -1894,7 +1894,7 @@ export class WorkspaceService extends EventEmitter { mode?: "exec" | "plan"; }; source?: "user" | "force-compaction" | "idle-compaction"; - interrupt?: "none" | "graceful" | "abandonPartial"; + interrupt?: "none" | "graceful" | "abort"; sendMessageOptions?: Omit; } ): Promise> { @@ -1943,9 +1943,11 @@ export class WorkspaceService extends EventEmitter { // Wait for stream to complete (soft interrupt) await session.interruptStream({ soft: true }); break; - case "abandonPartial": - // Abort immediately - await session.interruptStream({ abandonPartial: true }); + case "abort": + // Interrupt immediately. + // NOTE: We intentionally preserve any streamed partial output. + // startCompaction() will commit partial.json to history before building the compaction prompt. + await session.interruptStream(); break; } } From b5f9d3236a14d6b701a5e7a68ff7a1347d49fcfd Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 14:38:57 -0600 Subject: [PATCH 5/9] =?UTF-8?q?=F0=9F=A4=96=20refactor:=20make=20compactio?= =?UTF-8?q?n=20streamMessage=20call=20readable?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/node/services/agentSession.ts | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index 41a3638b18..c2a762235d 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -734,21 +734,31 @@ export class AgentSession { : undefined; // Stream compaction (no history persistence for the prompt message) + const abortSignal = undefined; + const additionalSystemInstructions = undefined; + const maxOutputTokens = options.maxOutputTokens; + const muxProviderOptions = options.sendMessageOptions?.providerOptions; + const mode = undefined; + const recordFileState = undefined; + const changedFileAttachments = undefined; + const postCompactionAttachments = undefined; + const experiments = options.sendMessageOptions?.experiments; + return this.aiService.streamMessage( compactionMessages, this.workspaceId, model, effectiveThinkingLevel, disableAllTools, - undefined, - undefined, - options.maxOutputTokens, - options.sendMessageOptions?.providerOptions, - undefined, - undefined, - undefined, - undefined, - options.sendMessageOptions?.experiments + abortSignal, + additionalSystemInstructions, + maxOutputTokens, + muxProviderOptions, + mode, + recordFileState, + changedFileAttachments, + postCompactionAttachments, + experiments ); } From 5120de6355f66dad4dc6d86c65a486aaf1b4b1ce Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 14:51:33 -0600 Subject: [PATCH 6/9] =?UTF-8?q?=F0=9F=A4=96=20tests:=20add=20ipc=20compact?= =?UTF-8?q?History=20integration=20test=20(Haiku)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/ipc/compactHistory.test.ts | 123 +++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 tests/ipc/compactHistory.test.ts diff --git a/tests/ipc/compactHistory.test.ts b/tests/ipc/compactHistory.test.ts new file mode 100644 index 0000000000..43e0306d5e --- /dev/null +++ b/tests/ipc/compactHistory.test.ts @@ -0,0 +1,123 @@ +/** + * compactHistory integration tests. + * + * Ensures compaction is a control-plane operation (not a slash-command string), and that + * history is replaced only on successful compaction completion. + * + * Requirements: + * - Uses the Haiku model for both normal messages and compaction + * - Builds history by sending messages (replicates user behavior) + */ + +import { shouldRunIntegrationTests, validateApiKeys } from "./setup"; +import { + createSharedRepo, + cleanupSharedRepo, + withSharedWorkspace, + configureTestRetries, +} from "./sendMessageTestHelpers"; +import { assertStreamSuccess, modelString, sendMessageWithModel } from "./helpers"; +import { KNOWN_MODELS } from "../../src/common/constants/knownModels"; + +// Skip all tests if TEST_INTEGRATION is not set +const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip; + +if (shouldRunIntegrationTests()) { + validateApiKeys(["ANTHROPIC_API_KEY"]); +} + +beforeAll(createSharedRepo); +afterAll(cleanupSharedRepo); + +describeIntegration("compactHistory integration tests", () => { + configureTestRetries(3); + + test.concurrent( + "should compact history using Haiku for both messages + compaction", + async () => { + await withSharedWorkspace("anthropic", async ({ env, workspaceId, collector }) => { + const haiku = modelString("anthropic", KNOWN_MODELS.HAIKU.providerModelId); + + // Build history via normal user interactions. + collector.clear(); + + const message1 = + "You are helping me plan a small refactor. Explain, in a few sentences, what the risks are when refactoring code without tests."; + const result1 = await sendMessageWithModel(env, workspaceId, message1, haiku); + expect(result1.success).toBe(true); + const streamEnd1 = await collector.waitForEvent("stream-end", 20000); + expect(streamEnd1).not.toBeNull(); + expect((streamEnd1 as { metadata: { model?: string } }).metadata.model).toBe(haiku); + assertStreamSuccess(collector); + + collector.clear(); + + const message2 = + "Now list three concrete steps I should take to refactor safely. Include enough detail that it would be useful in a code review."; + const result2 = await sendMessageWithModel(env, workspaceId, message2, haiku); + expect(result2.success).toBe(true); + const streamEnd2 = await collector.waitForEvent("stream-end", 20000); + expect(streamEnd2).not.toBeNull(); + expect((streamEnd2 as { metadata: { model?: string } }).metadata.model).toBe(haiku); + assertStreamSuccess(collector); + + collector.clear(); + + // Trigger compaction explicitly via the control-plane API. + const compactResult = await env.orpc.workspace.compactHistory({ + workspaceId, + model: haiku, + maxOutputTokens: 800, + source: "user", + interrupt: "none", + sendMessageOptions: { + model: haiku, + thinkingLevel: "off", + }, + }); + + expect(compactResult.success).toBe(true); + if (!compactResult.success) { + throw new Error(String(compactResult.error)); + } + + // Ensure this stream is actually the compaction stream. + const streamStart = await collector.waitForEvent("stream-start", 20000); + expect(streamStart).not.toBeNull(); + const compactionMessageId = (streamStart as { messageId: string }).messageId; + + const streamEnd = await collector.waitForEvent("stream-end", 30000); + expect(streamEnd).not.toBeNull(); + expect((streamEnd as { messageId: string }).messageId).toBe(compactionMessageId); + expect((streamEnd as { metadata: { model?: string } }).metadata.model).toBe(haiku); + assertStreamSuccess(collector); + + // The compaction handler emits a single summary message + delete event. + const deleteEvent = collector.getEvents().find((e) => e.type === "delete"); + expect(deleteEvent).toBeDefined(); + + const summaryMessage = collector + .getEvents() + .find((e) => e.type === "message" && e.role === "assistant" && e.metadata?.compacted); + expect(summaryMessage).toBeDefined(); + expect((summaryMessage as { metadata?: { model?: string } }).metadata?.model).toBe(haiku); + + // Verify persisted history was replaced (user behavior: reload workspace). + const replay = await env.orpc.workspace.getFullReplay({ workspaceId }); + const replayMessages = replay.filter((m) => m.type === "message"); + + // After compaction we should only have a single assistant summary message. + expect(replayMessages).toHaveLength(1); + expect(replayMessages[0].role).toBe("assistant"); + expect(replayMessages[0].metadata?.compacted).toBeDefined(); + expect(replayMessages[0].metadata?.model).toBe(haiku); + + // Sanity check: original user prompt text should not be present after replacement. + const replayText = JSON.stringify(replayMessages[0]); + expect(replayText).not.toContain("refactoring code without tests"); + expect(replayText).not.toContain("three concrete steps"); + }); + }, + 90000 + ); +}); From cf9d676b095ad2f27de5c07045bd510da1e1dc77 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 15:13:58 -0600 Subject: [PATCH 7/9] fix: send continueMessage after compaction completes --- src/node/services/agentSession.ts | 10 ++ tests/ipc/compactHistory.test.ts | 199 +++++++++++++++++++++--------- tests/ipc/helpers.ts | 30 +++++ 3 files changed, 180 insertions(+), 59 deletions(-) diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index c2a762235d..e9e1c36c86 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -1099,6 +1099,16 @@ export class AgentSession { return; } + // If a compaction operation is active, do NOT auto-send queued messages. + // + // Why: if an earlier stream (the one we interrupted to start compaction) emits a late + // stream-end/tool-call-end event, it can trigger sendQueuedMessages() while compaction is + // still streaming. That can cause the queued continue message to be sent too early and + // then wiped by the compaction history replacement. + if (this.activeCompactionOperation) { + return; + } + // Clear the queued message flag (even if queue is empty, to handle race conditions) this.backgroundProcessManager.setMessageQueued(this.workspaceId, false); diff --git a/tests/ipc/compactHistory.test.ts b/tests/ipc/compactHistory.test.ts index 43e0306d5e..dff7512ff8 100644 --- a/tests/ipc/compactHistory.test.ts +++ b/tests/ipc/compactHistory.test.ts @@ -1,14 +1,16 @@ /** * compactHistory integration tests. * - * Ensures compaction is a control-plane operation (not a slash-command string), and that - * history is replaced only on successful compaction completion. + * Ensures compaction is a control-plane operation (not a slash-command string), and that: + * - History is replaced only on successful compaction completion + * - continueMessage is auto-sent after compaction completes * * Requirements: - * - Uses the Haiku model for both normal messages and compaction - * - Builds history by sending messages (replicates user behavior) + * - Uses the Haiku model for both compaction and the follow-up continue message + * - Seeds history via HistoryService (test-only) to avoid extra API calls */ +import type { WorkspaceChatMessage } from "@/common/orpc/types"; import { shouldRunIntegrationTests, validateApiKeys } from "./setup"; import { createSharedRepo, @@ -16,7 +18,7 @@ import { withSharedWorkspace, configureTestRetries, } from "./sendMessageTestHelpers"; -import { assertStreamSuccess, modelString, sendMessageWithModel } from "./helpers"; +import { modelString, seedHistoryMessages } from "./helpers"; import { KNOWN_MODELS } from "../../src/common/constants/knownModels"; // Skip all tests if TEST_INTEGRATION is not set @@ -29,47 +31,78 @@ if (shouldRunIntegrationTests()) { beforeAll(createSharedRepo); afterAll(cleanupSharedRepo); +function getTextFromMessageParts(message: { + parts?: Array<{ type: string; text?: string }>; +}): string { + return ( + message.parts + ?.filter((part) => part.type === "text") + .map((part) => part.text ?? "") + .join("") ?? "" + ); +} + +async function waitForMatchingEvent( + collector: { getEvents: () => WorkspaceChatMessage[] }, + predicate: (event: WorkspaceChatMessage) => boolean, + timeoutMs: number +): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + const match = collector.getEvents().find(predicate); + if (match) { + return match; + } + await new Promise((resolve) => setTimeout(resolve, 25)); + } + return null; +} + describeIntegration("compactHistory integration tests", () => { configureTestRetries(3); test.concurrent( - "should compact history using Haiku for both messages + compaction", + "should compact history and then auto-send continueMessage", async () => { await withSharedWorkspace("anthropic", async ({ env, workspaceId, collector }) => { const haiku = modelString("anthropic", KNOWN_MODELS.HAIKU.providerModelId); - // Build history via normal user interactions. - collector.clear(); - - const message1 = - "You are helping me plan a small refactor. Explain, in a few sentences, what the risks are when refactoring code without tests."; - const result1 = await sendMessageWithModel(env, workspaceId, message1, haiku); - expect(result1.success).toBe(true); - const streamEnd1 = await collector.waitForEvent("stream-end", 20000); - expect(streamEnd1).not.toBeNull(); - expect((streamEnd1 as { metadata: { model?: string } }).metadata.model).toBe(haiku); - assertStreamSuccess(collector); + // Seed history quickly (test-only) to avoid extra API calls. + const seededIds = await seedHistoryMessages(workspaceId, env.config, [ + { + id: "seed-user-0", + role: "user", + content: "Context: we are discussing a small code refactor.", + }, + { + id: "seed-assistant-0", + role: "assistant", + content: "Acknowledged. I will help.", + }, + { + id: "seed-user-1", + role: "user", + content: "Please keep responses short and practical.", + }, + { + id: "seed-assistant-1", + role: "assistant", + content: "Understood.", + }, + ]); collector.clear(); - const message2 = - "Now list three concrete steps I should take to refactor safely. Include enough detail that it would be useful in a code review."; - const result2 = await sendMessageWithModel(env, workspaceId, message2, haiku); - expect(result2.success).toBe(true); - const streamEnd2 = await collector.waitForEvent("stream-end", 20000); - expect(streamEnd2).not.toBeNull(); - expect((streamEnd2 as { metadata: { model?: string } }).metadata.model).toBe(haiku); - assertStreamSuccess(collector); - - collector.clear(); + const continueText = "Continue: reply with exactly 'OK'."; - // Trigger compaction explicitly via the control-plane API. + // Trigger compaction via the control-plane API. const compactResult = await env.orpc.workspace.compactHistory({ workspaceId, model: haiku, maxOutputTokens: 800, source: "user", interrupt: "none", + continueMessage: { text: continueText }, sendMessageOptions: { model: haiku, thinkingLevel: "off", @@ -81,41 +114,89 @@ describeIntegration("compactHistory integration tests", () => { throw new Error(String(compactResult.error)); } - // Ensure this stream is actually the compaction stream. - const streamStart = await collector.waitForEvent("stream-start", 20000); - expect(streamStart).not.toBeNull(); - const compactionMessageId = (streamStart as { messageId: string }).messageId; - - const streamEnd = await collector.waitForEvent("stream-end", 30000); - expect(streamEnd).not.toBeNull(); - expect((streamEnd as { messageId: string }).messageId).toBe(compactionMessageId); - expect((streamEnd as { metadata: { model?: string } }).metadata.model).toBe(haiku); - assertStreamSuccess(collector); - - // The compaction handler emits a single summary message + delete event. - const deleteEvent = collector.getEvents().find((e) => e.type === "delete"); - expect(deleteEvent).toBeDefined(); - - const summaryMessage = collector - .getEvents() - .find((e) => e.type === "message" && e.role === "assistant" && e.metadata?.compacted); - expect(summaryMessage).toBeDefined(); - expect((summaryMessage as { metadata?: { model?: string } }).metadata?.model).toBe(haiku); - - // Verify persisted history was replaced (user behavior: reload workspace). + // Wait for compaction stream to start + end. + const compactionStreamStart = await collector.waitForEvent("stream-start", 20000); + expect(compactionStreamStart).not.toBeNull(); + const compactionMessageId = (compactionStreamStart as { messageId: string }).messageId; + + const compactionStreamEnd = await waitForMatchingEvent( + collector, + (e) => + e.type === "stream-end" && + (e as { messageId?: string }).messageId === compactionMessageId, + 45000 + ); + expect(compactionStreamEnd).not.toBeNull(); + expect((compactionStreamEnd as { metadata: { model?: string } }).metadata.model).toBe( + haiku + ); + + // Compaction should emit delete + summary message. + const deleteEvent = await waitForMatchingEvent( + collector, + (e) => e.type === "delete", + 10000 + ); + expect(deleteEvent).not.toBeNull(); + + const summaryMessage = await waitForMatchingEvent( + collector, + (e) => e.type === "message" && e.role === "assistant" && Boolean(e.metadata?.compacted), + 10000 + ); + expect(summaryMessage).not.toBeNull(); + + // Continue message should be persisted as a user message and then streamed. + const continueUserMessage = await waitForMatchingEvent( + collector, + (e) => + e.type === "message" && + e.role === "user" && + getTextFromMessageParts(e) === continueText, + 20000 + ); + expect(continueUserMessage).not.toBeNull(); + + const continueStreamStart = await waitForMatchingEvent( + collector, + (e) => + e.type === "stream-start" && + (e as { messageId?: string }).messageId !== compactionMessageId && + (e as { metadata?: { model?: string } }).metadata?.model === haiku, + 20000 + ); + expect(continueStreamStart).not.toBeNull(); + + const continueMessageId = (continueStreamStart as { messageId: string }).messageId; + const continueStreamEnd = await waitForMatchingEvent( + collector, + (e) => + e.type === "stream-end" && + (e as { messageId?: string }).messageId === continueMessageId, + 45000 + ); + expect(continueStreamEnd).not.toBeNull(); + + // Verify persisted history: + // - seeded messages were removed + // - summary exists + // - continue message exists const replay = await env.orpc.workspace.getFullReplay({ workspaceId }); const replayMessages = replay.filter((m) => m.type === "message"); - // After compaction we should only have a single assistant summary message. - expect(replayMessages).toHaveLength(1); - expect(replayMessages[0].role).toBe("assistant"); - expect(replayMessages[0].metadata?.compacted).toBeDefined(); - expect(replayMessages[0].metadata?.model).toBe(haiku); + for (const id of seededIds) { + expect(replayMessages.some((m) => m.id === id)).toBe(false); + } + + const summaryIndex = replayMessages.findIndex( + (m) => m.role === "assistant" && Boolean(m.metadata?.compacted) + ); + expect(summaryIndex).toBe(0); - // Sanity check: original user prompt text should not be present after replacement. - const replayText = JSON.stringify(replayMessages[0]); - expect(replayText).not.toContain("refactoring code without tests"); - expect(replayText).not.toContain("three concrete steps"); + const continueIndex = replayMessages.findIndex( + (m) => m.role === "user" && getTextFromMessageParts(m) === continueText + ); + expect(continueIndex).toBeGreaterThan(summaryIndex); }); }, 90000 diff --git a/tests/ipc/helpers.ts b/tests/ipc/helpers.ts index bc0ceae9ec..c69745acb2 100644 --- a/tests/ipc/helpers.ts +++ b/tests/ipc/helpers.ts @@ -590,6 +590,36 @@ export async function cleanupTempGitRepo(repoPath: string): Promise { console.warn(`Failed to cleanup temp git repo after ${maxRetries} attempts:`, lastError); } +/** + * Seed a workspace history with explicit messages. + * + * Test-only: uses HistoryService directly to populate chat.jsonl without making API calls. + * Real application code should NEVER bypass IPC like this. + */ +export async function seedHistoryMessages( + workspaceId: string, + config: { getSessionDir: (id: string) => string }, + messages: Array<{ id?: string; role: "user" | "assistant"; content: string }> +): Promise { + // HistoryService only needs getSessionDir, so we can cast the partial config. + const historyService = new HistoryService(config as any); + + const ids: string[] = []; + for (let i = 0; i < messages.length; i++) { + const entry = messages[i]; + const id = entry.id ?? `seed-msg-${i}`; + ids.push(id); + + const message = createMuxMessage(id, entry.role, entry.content, {}); + const result = await historyService.appendToHistory(workspaceId, message); + if (!result.success) { + throw new Error(`Failed to append history message ${i} (${id}): ${result.error}`); + } + } + + return ids; +} + /** * Build large conversation history to test context limits * From e1339e448a0f2a5c4dc904eb7ea0645deff2c662 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 17:51:07 -0600 Subject: [PATCH 8/9] fix: ensure compaction clears UI history and continue stream detectable --- src/node/services/compactionHandler.test.ts | 25 ++++-- src/node/services/compactionHandler.ts | 15 +++- tests/ipc/compactHistory.test.ts | 87 ++++++--------------- 3 files changed, 59 insertions(+), 68 deletions(-) diff --git a/src/node/services/compactionHandler.test.ts b/src/node/services/compactionHandler.test.ts index ae5473cc9c..ecbbac3a3d 100644 --- a/src/node/services/compactionHandler.test.ts +++ b/src/node/services/compactionHandler.test.ts @@ -78,9 +78,9 @@ const createMockEmitter = (): { emitter: EventEmitter; events: EmittedEvent[] } }; /** Helper: create a normal user message (not compaction) */ -const createNormalUserMessage = (id = "msg-1"): MuxMessage => +const createNormalUserMessage = (id = "msg-1", historySequence = 0): MuxMessage => createMuxMessage(id, "user", "Hello, how are you?", { - historySequence: 0, + historySequence, muxMetadata: { type: "normal" }, }); @@ -289,8 +289,15 @@ describe("CompactionHandler", () => { it("should emit delete event for old messages", async () => { setActiveOperation("op-1"); - mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); - mockHistoryService.mockReplaceHistory(Ok([0, 1, 2, 3])); + mockHistoryService.mockGetHistory( + Ok([ + createNormalUserMessage("msg-0", 0), + createNormalUserMessage("msg-1", 1), + createNormalUserMessage("msg-2", 2), + createNormalUserMessage("msg-3", 3), + ]) + ); + mockHistoryService.mockReplaceHistory(Ok([])); const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); @@ -525,8 +532,14 @@ describe("CompactionHandler", () => { it("should emit DeleteMessage with correct type and historySequences array", async () => { setActiveOperation("op-1"); - mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); - mockHistoryService.mockReplaceHistory(Ok([5, 10, 15])); + mockHistoryService.mockGetHistory( + Ok([ + createNormalUserMessage("msg-5", 5), + createNormalUserMessage("msg-10", 10), + createNormalUserMessage("msg-15", 15), + ]) + ); + mockHistoryService.mockReplaceHistory(Ok([])); const event = createStreamEndEvent(createValidSummary()); await handler.handleCompletion(event); diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index 9ae33b2903..0e8ab673bb 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -308,6 +308,9 @@ export class CompactionHandler { "assistant", summary, { + // Ensures the UI can later delete/replace the summary message (e.g., on /clear). + // replaceHistory() will normalize sequences starting at 0. + historySequence: 0, timestamp, compacted: isIdleCompaction ? "idle" : "user", model: metadata.model, @@ -318,6 +321,17 @@ export class CompactionHandler { } ); + // Emit delete events based on the history we used for compaction. + // + // Why not rely on HistoryService.replaceHistory()'s deleted sequences? + // - The UI currently has these messages in memory (from streaming), so this is the + // authoritative list to clear from the transcript. + // - It avoids edge cases where chat.jsonl parsing skips a malformed trailing line, + // resulting in an empty deletedSequences list and a non-updating UI. + const deletedSequences = messages + .map((msg) => msg.metadata?.historySequence ?? -1) + .filter((s) => s >= 0); + // Atomically replace history with the single summary message. // This avoids the "delete then crash" failure mode. const replaceResult = await this.historyService.replaceHistory(this.workspaceId, [ @@ -326,7 +340,6 @@ export class CompactionHandler { if (!replaceResult.success) { return Err(`Failed to replace history: ${replaceResult.error}`); } - const deletedSequences = replaceResult.data; // Set flag to trigger post-compaction attachment injection on next turn this.postCompactionAttachmentsPending = true; diff --git a/tests/ipc/compactHistory.test.ts b/tests/ipc/compactHistory.test.ts index dff7512ff8..81cce2bc1d 100644 --- a/tests/ipc/compactHistory.test.ts +++ b/tests/ipc/compactHistory.test.ts @@ -10,7 +10,6 @@ * - Seeds history via HistoryService (test-only) to avoid extra API calls */ -import type { WorkspaceChatMessage } from "@/common/orpc/types"; import { shouldRunIntegrationTests, validateApiKeys } from "./setup"; import { createSharedRepo, @@ -42,22 +41,6 @@ function getTextFromMessageParts(message: { ); } -async function waitForMatchingEvent( - collector: { getEvents: () => WorkspaceChatMessage[] }, - predicate: (event: WorkspaceChatMessage) => boolean, - timeoutMs: number -): Promise { - const start = Date.now(); - while (Date.now() - start < timeoutMs) { - const match = collector.getEvents().find(predicate); - if (match) { - return match; - } - await new Promise((resolve) => setTimeout(resolve, 25)); - } - return null; -} - describeIntegration("compactHistory integration tests", () => { configureTestRetries(3); @@ -115,67 +98,49 @@ describeIntegration("compactHistory integration tests", () => { } // Wait for compaction stream to start + end. - const compactionStreamStart = await collector.waitForEvent("stream-start", 20000); + const compactionStreamStart = await collector.waitForEventN("stream-start", 1, 20000); expect(compactionStreamStart).not.toBeNull(); const compactionMessageId = (compactionStreamStart as { messageId: string }).messageId; - const compactionStreamEnd = await waitForMatchingEvent( - collector, - (e) => - e.type === "stream-end" && - (e as { messageId?: string }).messageId === compactionMessageId, - 45000 - ); + const compactionStreamEnd = await collector.waitForEventN("stream-end", 1, 45000); expect(compactionStreamEnd).not.toBeNull(); + expect((compactionStreamEnd as { messageId: string }).messageId).toBe(compactionMessageId); expect((compactionStreamEnd as { metadata: { model?: string } }).metadata.model).toBe( haiku ); // Compaction should emit delete + summary message. - const deleteEvent = await waitForMatchingEvent( - collector, - (e) => e.type === "delete", - 10000 - ); + const deleteEvent = await collector.waitForEvent("delete", 10000); expect(deleteEvent).not.toBeNull(); - const summaryMessage = await waitForMatchingEvent( - collector, - (e) => e.type === "message" && e.role === "assistant" && Boolean(e.metadata?.compacted), - 10000 - ); - expect(summaryMessage).not.toBeNull(); + const summaryMessage = collector + .getEvents() + .find( + (e) => e.type === "message" && e.role === "assistant" && Boolean(e.metadata?.compacted) + ); + expect(summaryMessage).toBeDefined(); // Continue message should be persisted as a user message and then streamed. - const continueUserMessage = await waitForMatchingEvent( - collector, - (e) => - e.type === "message" && - e.role === "user" && - getTextFromMessageParts(e) === continueText, - 20000 - ); - expect(continueUserMessage).not.toBeNull(); - - const continueStreamStart = await waitForMatchingEvent( - collector, - (e) => - e.type === "stream-start" && - (e as { messageId?: string }).messageId !== compactionMessageId && - (e as { metadata?: { model?: string } }).metadata?.model === haiku, - 20000 - ); + // We expect a second stream-start for the follow-up message. + const continueStreamStart = await collector.waitForEventN("stream-start", 2, 20000); expect(continueStreamStart).not.toBeNull(); const continueMessageId = (continueStreamStart as { messageId: string }).messageId; - const continueStreamEnd = await waitForMatchingEvent( - collector, - (e) => - e.type === "stream-end" && - (e as { messageId?: string }).messageId === continueMessageId, - 45000 - ); + expect(continueMessageId).not.toBe(compactionMessageId); + + const continueUserMessage = collector + .getEvents() + .find( + (e) => + e.type === "message" && + e.role === "user" && + getTextFromMessageParts(e) === continueText + ); + expect(continueUserMessage).toBeDefined(); + + const continueStreamEnd = await collector.waitForEventN("stream-end", 2, 45000); expect(continueStreamEnd).not.toBeNull(); + expect((continueStreamEnd as { messageId: string }).messageId).toBe(continueMessageId); // Verify persisted history: // - seeded messages were removed From 69042599133773483f4aab533d24fc735ae2f357 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 21 Dec 2025 21:58:01 -0600 Subject: [PATCH 9/9] fix: allow short compaction summaries --- src/node/services/compactionHandler.test.ts | 8 +++----- src/node/services/compactionHandler.ts | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/node/services/compactionHandler.test.ts b/src/node/services/compactionHandler.test.ts index ecbbac3a3d..30f2b93f4b 100644 --- a/src/node/services/compactionHandler.test.ts +++ b/src/node/services/compactionHandler.test.ts @@ -380,14 +380,12 @@ describe("CompactionHandler", () => { expect(clearActiveOperationCalled).toBe(true); }); - it("should reject summary that is too short (<50 words)", async () => { + it("should reject summary that is too short (<10 words)", async () => { setActiveOperation("op-1"); mockHistoryService.mockGetHistory(Ok([createNormalUserMessage()])); - // Only ~10 words - const event = createStreamEndEvent( - "This is a very short summary that won't pass validation." - ); + // Only 9 words + const event = createStreamEndEvent("one two three four five six seven eight nine"); const result = await handler.handleCompletion(event); expect(result).toBe(true); diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index 0e8ab673bb..2d222ecfdf 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -19,7 +19,7 @@ import { import { computeRecencyFromMessages } from "@/common/utils/recency"; /** Minimum word count for a valid compaction summary */ -const MIN_SUMMARY_WORDS = 50; +const MIN_SUMMARY_WORDS = 10; /** Active compaction operation tracked via session state (control-plane) */ export interface ActiveCompactionOperation {