diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 4967560830..52577a2334 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -748,6 +748,126 @@ async function processDocumentsWithTrigger( } } +interface NewDocumentRow { + id: string + knowledgeBaseId: string + filename: string + fileUrl: string + fileSize: number + mimeType: string + chunkCount: number + tokenCount: number + characterCount: number + processingStatus: 'pending' + enabled: boolean + uploadedAt: Date + tag1: string | null + tag2: string | null + tag3: string | null + tag4: string | null + tag5: string | null + tag6: string | null + tag7: string | null + number1: number | null + number2: number | null + number3: number | null + number4: number | null + number5: number | null + date1: Date | null + date2: Date | null + boolean1: boolean | null + boolean2: boolean | null + boolean3: boolean | null +} + +/** + * Insert N document rows IF the parent knowledge base is still alive + * (`deleted_at IS NULL`) at the statement's MVCC snapshot. Returns the + * number of rows actually inserted. + * + * Knowledge bases are soft-deleted, so a normal FK can't catch a concurrent + * delete — the KB row physically remains. We do the existence check and the + * insert in a single statement via INSERT...SELECT...WHERE EXISTS, which + * Postgres evaluates atomically. No transaction or row lock required, no + * race window between check and insert. + * + * Returns 0 if the KB was soft-deleted; caller throws. + */ +async function insertDocumentsIfKbAlive( + rows: NewDocumentRow[], + knowledgeBaseId: string +): Promise { + if (rows.length === 0) return 0 + + // jsonb_to_recordset declares the column types once, so we don't need to + // cast every parameter individually to keep Postgres' type inference happy + // when nullable columns end up all-NULL across the batch. + const jsonRows = rows.map((d) => ({ + id: d.id, + knowledge_base_id: d.knowledgeBaseId, + filename: d.filename, + file_url: d.fileUrl, + file_size: d.fileSize, + mime_type: d.mimeType, + chunk_count: d.chunkCount, + token_count: d.tokenCount, + character_count: d.characterCount, + processing_status: d.processingStatus, + enabled: d.enabled, + uploaded_at: d.uploadedAt.toISOString(), + tag1: d.tag1, + tag2: d.tag2, + tag3: d.tag3, + tag4: d.tag4, + tag5: d.tag5, + tag6: d.tag6, + tag7: d.tag7, + number1: d.number1, + number2: d.number2, + number3: d.number3, + number4: d.number4, + number5: d.number5, + date1: d.date1?.toISOString() ?? null, + date2: d.date2?.toISOString() ?? null, + boolean1: d.boolean1, + boolean2: d.boolean2, + boolean3: d.boolean3, + })) + + const result = await db.execute(sql` + INSERT INTO document ( + id, knowledge_base_id, filename, file_url, file_size, mime_type, + chunk_count, token_count, character_count, processing_status, enabled, uploaded_at, + tag1, tag2, tag3, tag4, tag5, tag6, tag7, + number1, number2, number3, number4, number5, + date1, date2, + boolean1, boolean2, boolean3 + ) + SELECT + id, knowledge_base_id, filename, file_url, file_size, mime_type, + chunk_count, token_count, character_count, processing_status, enabled, uploaded_at, + tag1, tag2, tag3, tag4, tag5, tag6, tag7, + number1, number2, number3, number4, number5, + date1, date2, + boolean1, boolean2, boolean3 + FROM jsonb_to_recordset(${JSON.stringify(jsonRows)}::jsonb) AS x( + id text, knowledge_base_id text, filename text, file_url text, file_size integer, mime_type text, + chunk_count integer, token_count integer, character_count integer, processing_status text, enabled boolean, uploaded_at timestamp, + tag1 text, tag2 text, tag3 text, tag4 text, tag5 text, tag6 text, tag7 text, + number1 double precision, number2 double precision, number3 double precision, number4 double precision, number5 double precision, + date1 timestamp, date2 timestamp, + boolean1 boolean, boolean2 boolean, boolean3 boolean + ) + WHERE EXISTS ( + SELECT 1 FROM knowledge_base + WHERE id = ${knowledgeBaseId} AND deleted_at IS NULL + ) + RETURNING id + `) + + return Array.from(result).length +} + export async function createDocumentRecords( documents: Array<{ filename: string @@ -766,99 +886,102 @@ export async function createDocumentRecords( knowledgeBaseId: string, requestId: string ): Promise { - return await db.transaction(async (tx) => { - await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`) - - const kb = await tx - .select({ id: knowledgeBase.id }) - .from(knowledgeBase) - .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) - .limit(1) + // Cheap upfront existence check so the common KB-not-found path fails fast + // before we burn CPU on tag processing. The atomic insert below is the + // race-safe guard against a concurrent KB soft-delete in the small window + // between this check and the insert. + const kb = await db + .select({ id: knowledgeBase.id }) + .from(knowledgeBase) + .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) + .limit(1) - if (kb.length === 0) { - throw new Error('Knowledge base not found') - } + if (kb.length === 0) { + throw new Error('Knowledge base not found') + } - const now = new Date() - const documentRecords = [] - const returnData: DocumentData[] = [] + const now = new Date() + const documentRecords: NewDocumentRow[] = [] + const returnData: DocumentData[] = [] - for (const docData of documents) { - const documentId = generateId() + for (const docData of documents) { + const documentId = generateId() - let processedTags: Partial = {} + let processedTags: Partial = {} - if (docData.documentTagsData) { - try { - const tagData = JSON.parse(docData.documentTagsData) - if (Array.isArray(tagData)) { - processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId) - } - } catch (error) { - if (error instanceof SyntaxError) { - logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error) - } else { - throw error - } + if (docData.documentTagsData) { + try { + const tagData = JSON.parse(docData.documentTagsData) + if (Array.isArray(tagData)) { + processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId) + } + } catch (error) { + if (error instanceof SyntaxError) { + logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error) + } else { + throw error } } + } - const newDocument = { - id: documentId, - knowledgeBaseId, - filename: docData.filename, - fileUrl: docData.fileUrl, - fileSize: docData.fileSize, - mimeType: docData.mimeType, - chunkCount: 0, - tokenCount: 0, - characterCount: 0, - processingStatus: 'pending' as const, - enabled: true, - uploadedAt: now, - tag1: processedTags.tag1 ?? docData.tag1 ?? null, - tag2: processedTags.tag2 ?? docData.tag2 ?? null, - tag3: processedTags.tag3 ?? docData.tag3 ?? null, - tag4: processedTags.tag4 ?? docData.tag4 ?? null, - tag5: processedTags.tag5 ?? docData.tag5 ?? null, - tag6: processedTags.tag6 ?? docData.tag6 ?? null, - tag7: processedTags.tag7 ?? docData.tag7 ?? null, - number1: processedTags.number1 ?? null, - number2: processedTags.number2 ?? null, - number3: processedTags.number3 ?? null, - number4: processedTags.number4 ?? null, - number5: processedTags.number5 ?? null, - date1: processedTags.date1 ?? null, - date2: processedTags.date2 ?? null, - boolean1: processedTags.boolean1 ?? null, - boolean2: processedTags.boolean2 ?? null, - boolean3: processedTags.boolean3 ?? null, - } - - documentRecords.push(newDocument) - returnData.push({ - documentId, - filename: docData.filename, - fileUrl: docData.fileUrl, - fileSize: docData.fileSize, - mimeType: docData.mimeType, - }) + const newDocument = { + id: documentId, + knowledgeBaseId, + filename: docData.filename, + fileUrl: docData.fileUrl, + fileSize: docData.fileSize, + mimeType: docData.mimeType, + chunkCount: 0, + tokenCount: 0, + characterCount: 0, + processingStatus: 'pending' as const, + enabled: true, + uploadedAt: now, + tag1: processedTags.tag1 ?? docData.tag1 ?? null, + tag2: processedTags.tag2 ?? docData.tag2 ?? null, + tag3: processedTags.tag3 ?? docData.tag3 ?? null, + tag4: processedTags.tag4 ?? docData.tag4 ?? null, + tag5: processedTags.tag5 ?? docData.tag5 ?? null, + tag6: processedTags.tag6 ?? docData.tag6 ?? null, + tag7: processedTags.tag7 ?? docData.tag7 ?? null, + number1: processedTags.number1 ?? null, + number2: processedTags.number2 ?? null, + number3: processedTags.number3 ?? null, + number4: processedTags.number4 ?? null, + number5: processedTags.number5 ?? null, + date1: processedTags.date1 ?? null, + date2: processedTags.date2 ?? null, + boolean1: processedTags.boolean1 ?? null, + boolean2: processedTags.boolean2 ?? null, + boolean3: processedTags.boolean3 ?? null, } - if (documentRecords.length > 0) { - await tx.insert(document).values(documentRecords) - logger.info( - `[${requestId}] Bulk created ${documentRecords.length} document records in knowledge base ${knowledgeBaseId}` - ) + documentRecords.push(newDocument) + returnData.push({ + documentId, + filename: docData.filename, + fileUrl: docData.fileUrl, + fileSize: docData.fileSize, + mimeType: docData.mimeType, + }) + } - await tx - .update(knowledgeBase) - .set({ updatedAt: now }) - .where(eq(knowledgeBase.id, knowledgeBaseId)) + if (documentRecords.length > 0) { + const insertedCount = await insertDocumentsIfKbAlive(documentRecords, knowledgeBaseId) + if (insertedCount === 0) { + throw new Error('Knowledge base not found') } + logger.info( + `[${requestId}] Bulk created ${insertedCount} document records in knowledge base ${knowledgeBaseId}` + ) - return returnData - }) + await db + .update(knowledgeBase) + .set({ updatedAt: now }) + .where(eq(knowledgeBase.id, knowledgeBaseId)) + } + + return returnData } export interface TagFilterCondition { @@ -1297,7 +1420,7 @@ export async function createSingleDocument( } } - const newDocument = { + const newDocument: NewDocumentRow = { id: documentId, knowledgeBaseId, filename: documentData.filename, @@ -1307,31 +1430,21 @@ export async function createSingleDocument( chunkCount: 0, tokenCount: 0, characterCount: 0, + processingStatus: 'pending', enabled: true, uploadedAt: now, ...processedTags, } - await db.transaction(async (tx) => { - await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`) - - const kb = await tx - .select({ id: knowledgeBase.id }) - .from(knowledgeBase) - .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) - .limit(1) - - if (kb.length === 0) { - throw new Error('Knowledge base not found') - } - - await tx.insert(document).values(newDocument) + const insertedCount = await insertDocumentsIfKbAlive([newDocument], knowledgeBaseId) + if (insertedCount === 0) { + throw new Error('Knowledge base not found') + } - await tx - .update(knowledgeBase) - .set({ updatedAt: now }) - .where(eq(knowledgeBase.id, knowledgeBaseId)) - }) + await db + .update(knowledgeBase) + .set({ updatedAt: now }) + .where(eq(knowledgeBase.id, knowledgeBaseId)) logger.info(`[${requestId}] Document created: ${documentId} in knowledge base ${knowledgeBaseId}`) return newDocument as { diff --git a/apps/sim/lib/workspaces/lifecycle.test.ts b/apps/sim/lib/workspaces/lifecycle.test.ts index 070b9c4ff2..d165472830 100644 --- a/apps/sim/lib/workspaces/lifecycle.test.ts +++ b/apps/sim/lib/workspaces/lifecycle.test.ts @@ -55,6 +55,7 @@ describe('workspace lifecycle', () => { }) const tx = { + execute: vi.fn().mockResolvedValue([]), select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue([{ id: 'kb-1' }]), diff --git a/apps/sim/lib/workspaces/lifecycle.ts b/apps/sim/lib/workspaces/lifecycle.ts index b0a2b0d616..975529d27d 100644 --- a/apps/sim/lib/workspaces/lifecycle.ts +++ b/apps/sim/lib/workspaces/lifecycle.ts @@ -49,6 +49,13 @@ export async function archiveWorkspace( .where(eq(workflowMcpServer.workspaceId, workspaceId)) await db.transaction(async (tx) => { + // Workspace archival is a rare admin/cleanup operation that touches every + // child table; on large workspaces it can exceed the 30s session default. + // Override per-tx with a generous ceiling — if it ever runs longer than + // this something is genuinely wrong. + await tx.execute(sql`SET LOCAL statement_timeout = '5min'`) + await tx.execute(sql`SET LOCAL lock_timeout = '30s'`) + await tx .update(knowledgeBase) .set({ diff --git a/packages/db/db.ts b/packages/db/db.ts index 6868bbaeeb..264b37121d 100644 --- a/packages/db/db.ts +++ b/packages/db/db.ts @@ -13,6 +13,14 @@ const postgresClient = postgres(connectionString, { connect_timeout: 30, max: 30, onnotice: () => {}, + // Server-side guards. lock_timeout cancels a query waiting on a row lock for + // >5s (e.g. another tx holding `SELECT ... FOR UPDATE`). statement_timeout + // cancels any query running >30s. Heavy paths that legitimately need longer + // (table service bulk JSONB rewrites) override per-tx with `SET LOCAL`. + connection: { + lock_timeout: 5_000, + statement_timeout: 30_000, + }, }) export const db = drizzle(postgresClient, { schema })