Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions apps/sim/app/api/tools/file/manage/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import { ensureAbsoluteUrl } from '@/lib/core/utils/urls'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import {
fetchWorkspaceFileBuffer,
getWorkspaceFile,
getWorkspaceFileByName,
updateWorkspaceFileContent,
uploadWorkspaceFile,
} from '@/lib/uploads/contexts/workspace/workspace-file-manager'
import { getFileExtension, getMimeTypeFromExtension } from '@/lib/uploads/utils/file-utils'
import { assertActiveWorkspaceAccess } from '@/lib/workspaces/permissions/utils'

export const dynamic = 'force-dynamic'

Expand All @@ -39,7 +41,54 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
}

try {
await assertActiveWorkspaceAccess(workspaceId, userId)

switch (body.operation) {
case 'get': {
const { fileId, fileInput } = body
const selectedFileId =
fileId ||
(fileInput && typeof fileInput === 'object' && !Array.isArray(fileInput)
? typeof fileInput.id === 'string'
? fileInput.id
: typeof fileInput.fileId === 'string'
? fileInput.fileId
: ''
: '')

if (!selectedFileId) {
return NextResponse.json({ success: false, error: 'File is required' }, { status: 400 })
}

const file = await getWorkspaceFile(workspaceId, selectedFileId)
if (!file) {
return NextResponse.json(
{ success: false, error: `File not found: "${selectedFileId}"` },
{ status: 404 }
)
}

logger.info('File retrieved', {
fileId: file.id,
name: file.name,
})

return NextResponse.json({
success: true,
data: {
file: {
id: file.id,
name: file.name,
url: ensureAbsoluteUrl(file.path),
size: file.size,
type: file.type,
key: file.key,
context: 'workspace',
},
},
Comment thread
Sg312 marked this conversation as resolved.
})
}

case 'write': {
const { fileName, content, contentType } = body
const mimeType = contentType || getMimeTypeFromExtension(getFileExtension(fileName))
Expand Down
166 changes: 139 additions & 27 deletions apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import { sleep } from '@sim/utils/helpers'
import { generateId } from '@sim/utils/id'
import { useQueryClient } from '@tanstack/react-query'
import { usePathname, useRouter } from 'next/navigation'
import { requestJson } from '@/lib/api/client/request'
import {
addMothershipChatResourceContract,
removeMothershipChatResourceContract,
reorderMothershipChatResourcesContract,
} from '@/lib/api/contracts/mothership-tasks'
import { cancelWorkflowExecutionContract } from '@/lib/api/contracts/workflows'
import { getMothershipAttachmentPreviewUrl } from '@/lib/copilot/chat/attachment-preview'
import { toDisplayMessage } from '@/lib/copilot/chat/display-message'
import { getLiveAssistantMessageId } from '@/lib/copilot/chat/effective-transcript'
Expand Down Expand Up @@ -1536,6 +1543,9 @@ export function useChat(
}, [])
const resourcesRef = useRef(resources)
resourcesRef.current = resources
const pendingPersistResourceKeysRef = useRef<Set<string>>(new Set())
const inFlightResourceAddsRef = useRef<Map<string, Promise<unknown>>>(new Map())
const reorderNeededAfterFlushRef = useRef(false)

// Derive the effective active resource ID — auto-selects the last resource when the stored ID is
// absent or no longer in the list, avoiding a separate Effect-based state correction loop.
Expand Down Expand Up @@ -1962,6 +1972,9 @@ export function useChat(
setTransportIdle()
setResources([])
setActiveResourceId(null)
pendingPersistResourceKeysRef.current.clear()
inFlightResourceAddsRef.current.clear()
reorderNeededAfterFlushRef.current = false
resetEphemeralPreviewState()
setMessageQueue([])
clearQueueDispatchState()
Expand All @@ -1974,6 +1987,44 @@ export function useChat(
setTransportIdle,
])

const flushPendingResources = useCallback(async (chatId: string) => {
const pendingKeys = pendingPersistResourceKeysRef.current
if (pendingKeys.size === 0) return
const flushPromises: Array<Promise<unknown>> = []
for (const resource of resourcesRef.current) {
if (resource.id === 'streaming-file') continue
const key = `${resource.type}:${resource.id}`
if (!pendingKeys.has(key)) continue
pendingKeys.delete(key)
const promise = requestJson(addMothershipChatResourceContract, {
body: { chatId, resource },
})
.catch((err) => {
pendingPersistResourceKeysRef.current.add(key)
logger.warn('Failed to flush pending resource; will retry on next hydration', err)
})
.finally(() => {
inFlightResourceAddsRef.current.delete(key)
})
inFlightResourceAddsRef.current.set(key, promise)
flushPromises.push(promise)
}
if (flushPromises.length === 0) return
await Promise.allSettled(flushPromises)
if (!reorderNeededAfterFlushRef.current) return
reorderNeededAfterFlushRef.current = false
const localOrder = resourcesRef.current.filter(
(r) =>
r.id !== 'streaming-file' && !pendingPersistResourceKeysRef.current.has(`${r.type}:${r.id}`)
)
if (localOrder.length === 0) return
requestJson(reorderMothershipChatResourcesContract, {
body: { chatId, resources: localOrder },
}).catch((err) => {
logger.warn('Failed to sync resource order after flush', err)
})
}, [])

const adoptResolvedChatId = useCallback(
(chatId: string, options?: { replaceHomeHistory?: boolean; invalidateList?: boolean }) => {
const selectedChatId = selectedChatIdRef.current
Expand All @@ -1992,8 +2043,9 @@ export function useChat(
if (options?.invalidateList) {
queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) })
}
flushPendingResources(chatId)
},
[queryClient, workspaceId]
[flushPendingResources, queryClient, workspaceId]
)

const { data: chatHistory } = useChatHistory(resolvedChatId)
Expand All @@ -2018,15 +2070,21 @@ export function useChat(
}

const persistChatId = chatIdRef.current ?? selectedChatIdRef.current
const key = `${resource.type}:${resource.id}`
if (persistChatId) {
// boundary-raw-fetch: fire-and-forget side-effect during stream lifecycle; intentionally avoids requestJson's response parsing/throw semantics so a failure here cannot interrupt the active turn
fetch('/api/mothership/chat/resources', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chatId: persistChatId, resource }),
}).catch((err) => {
logger.warn('Failed to persist resource', err)
const promise = requestJson(addMothershipChatResourceContract, {
body: { chatId: persistChatId, resource },
})
.catch((err) => {
pendingPersistResourceKeysRef.current.add(key)
logger.warn('Failed to persist resource; will retry on next hydration', err)
})
.finally(() => {
inFlightResourceAddsRef.current.delete(key)
})
inFlightResourceAddsRef.current.set(key, promise)
} else {
pendingPersistResourceKeysRef.current.add(key)
}
return true
}, [])
Expand All @@ -2035,21 +2093,67 @@ export function useChat(
setResources((prev) => prev.filter((r) => !(r.type === resourceType && r.id === resourceId)))
setActiveResourceId((prev) => (prev === resourceId ? null : prev))

const key = `${resourceType}:${resourceId}`
const wasPending = pendingPersistResourceKeysRef.current.delete(key)
const inFlightAdd = inFlightResourceAddsRef.current.get(key)
if (wasPending && !inFlightAdd) return

const persistChatId = chatIdRef.current ?? selectedChatIdRef.current
if (persistChatId) {
// boundary-raw-fetch: fire-and-forget side-effect; intentionally avoids requestJson's response parsing/throw semantics so a transient failure cannot interrupt the caller
fetch('/api/mothership/chat/resources', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chatId: persistChatId, resourceType, resourceId }),
if (!persistChatId) return
const fireDelete = () => {
requestJson(removeMothershipChatResourceContract, {
body: { chatId: persistChatId, resourceType, resourceId },
}).catch((err) => {
logger.warn('Failed to persist resource removal', err)
})
}
if (inFlightAdd) {
inFlightAdd.finally(fireDelete)
} else {
fireDelete()
}
}, [])

const reorderResources = useCallback((newOrder: MothershipResource[]) => {
setResources(newOrder)
const persistChatId = chatIdRef.current ?? selectedChatIdRef.current
if (!persistChatId) return
const pendingKeys = pendingPersistResourceKeysRef.current
const inFlightAdds = inFlightResourceAddsRef.current
const hasUnsyncedAdds = newOrder.some((r) => {
const key = `${r.type}:${r.id}`
return pendingKeys.has(key) || inFlightAdds.has(key)
})
if (hasUnsyncedAdds) {
reorderNeededAfterFlushRef.current = true
if (pendingKeys.size === 0 && inFlightAdds.size > 0) {
Promise.allSettled(Array.from(inFlightAdds.values())).then(() => {
if (!reorderNeededAfterFlushRef.current) return
reorderNeededAfterFlushRef.current = false
const chatId = chatIdRef.current ?? selectedChatIdRef.current
if (!chatId) return
const order = resourcesRef.current.filter(
(r) =>
r.id !== 'streaming-file' &&
!pendingPersistResourceKeysRef.current.has(`${r.type}:${r.id}`)
)
if (order.length === 0) return
requestJson(reorderMothershipChatResourcesContract, {
body: { chatId, resources: order },
}).catch((err) => {
logger.warn('Failed to sync resource order after in-flight ADDs', err)
})
})
}
return
}
const persistableResources = newOrder.filter((r) => r.id !== 'streaming-file')
if (persistableResources.length === 0) return
requestJson(reorderMothershipChatResourcesContract, {
body: { chatId: persistChatId, resources: persistableResources },
}).catch((err) => {
logger.warn('Failed to persist resource reorder', err)
})
}, [])

const ensureWorkflowToolResource = useCallback(
Expand Down Expand Up @@ -2179,6 +2283,9 @@ export function useChat(
setTransportIdle()
setResources([])
setActiveResourceId(null)
pendingPersistResourceKeysRef.current.clear()
inFlightResourceAddsRef.current.clear()
reorderNeededAfterFlushRef.current = false
resetEphemeralPreviewState()
setMessageQueue([])
clearQueueDispatchState()
Expand Down Expand Up @@ -2229,27 +2336,32 @@ export function useChat(

const hasPersistedStreamingFile = chatHistory.resources.some((r) => r.id === 'streaming-file')
if (hasPersistedStreamingFile) {
// boundary-raw-fetch: fire-and-forget cleanup during chat-history hydration; failures are silently swallowed to keep hydration non-blocking
fetch('/api/mothership/chat/resources', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
requestJson(removeMothershipChatResourceContract, {
body: {
chatId: chatHistory.id,
resourceType: 'file',
resourceId: 'streaming-file',
}),
},
}).catch(() => {})
}

flushPendingResources(chatHistory.id)

const persistedResources = chatHistory.resources.filter((r) => r.id !== 'streaming-file')
if (persistedResources.length > 0) {
const serverKeys = new Set(persistedResources.map((r) => `${r.type}:${r.id}`))
const localOnly = resourcesRef.current.filter(
(r) => r.id !== 'streaming-file' && !serverKeys.has(`${r.type}:${r.id}`)
)
const mergedResources = [...persistedResources, ...localOnly]

if (mergedResources.length > 0) {
const hydratedActiveResourceId =
activeResourceIdRef.current &&
persistedResources.some((resource) => resource.id === activeResourceIdRef.current)
mergedResources.some((resource) => resource.id === activeResourceIdRef.current)
? activeResourceIdRef.current
: persistedResources[persistedResources.length - 1].id
: mergedResources[mergedResources.length - 1].id
activeResourceIdRef.current = hydratedActiveResourceId
setResources(persistedResources)
setResources(mergedResources)
setActiveResourceId(hydratedActiveResourceId)

for (const resource of persistedResources) {
Expand Down Expand Up @@ -2373,6 +2485,7 @@ export function useChat(
workspaceId,
cancelActiveStreamReader,
cancelActiveStreamRecovery,
flushPendingResources,
queryClient,
recoverPendingClientWorkflowTools,
seedPreviewSessions,
Expand Down Expand Up @@ -5003,9 +5116,8 @@ export function useChat(
const executionId = execState.getCurrentExecutionId(workflowId)
if (executionId) {
execState.setCurrentExecutionId(workflowId, null)
// boundary-raw-fetch: fire-and-forget execution cancellation invoked from a stop-generation barrier; failures are silently swallowed so the stop teardown cannot stall on a contract-validation throw
fetch(`/api/workflows/${workflowId}/executions/${executionId}/cancel`, {
method: 'POST',
requestJson(cancelWorkflowExecutionContract, {
params: { id: workflowId, executionId },
}).catch(() => {})
}

Expand Down
Loading
Loading