Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/realtime/src/database/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const socketDb = drizzle(
prepare: false,
idle_timeout: 10,
connect_timeout: 20,
max: 30,
max: 15,
onnotice: () => {},
}),
{ schema }
Expand Down
12 changes: 10 additions & 2 deletions apps/sim/app/api/auth/socket-token/route.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { headers } from 'next/headers'
import { NextResponse } from 'next/server'
import { type NextRequest, NextResponse } from 'next/server'
import { auth } from '@/lib/auth'
import { isAuthDisabled } from '@/lib/core/config/feature-flags'
import { enforceIpRateLimit } from '@/lib/core/rate-limiter'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'

const logger = createLogger('SocketTokenAPI')

export const POST = withRouteHandler(async () => {
export const POST = withRouteHandler(async (request: NextRequest) => {
if (isAuthDisabled) {
return NextResponse.json({ token: 'anonymous-socket-token' })
}

const rateLimited = await enforceIpRateLimit('socket-token', request, {
maxTokens: 30,
refillRate: 30,
refillIntervalMs: 60_000,
})
if (rateLimited) return rateLimited

try {
const hdrs = await headers()
const response = await auth.api.generateOneTimeToken({
Expand Down
9 changes: 9 additions & 0 deletions apps/sim/app/api/auth/sso/providers/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { listSsoProvidersContract } from '@/lib/api/contracts/auth'
import { parseRequest } from '@/lib/api/server'
import { getSession } from '@/lib/auth'
import { enforceIpRateLimit } from '@/lib/core/rate-limiter'
import { REDACTED_MARKER } from '@/lib/core/security/redaction'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'

Expand All @@ -13,6 +14,14 @@ const logger = createLogger('SSOProvidersRoute')
export const GET = withRouteHandler(async (request: NextRequest) => {
try {
const session = await getSession()
if (!session?.user?.id) {
const rateLimited = await enforceIpRateLimit('sso-providers', request, {
maxTokens: 20,
refillRate: 20,
refillIntervalMs: 60_000,
})
if (rateLimited) return rateLimited
}
const parsed = await parseRequest(listSsoProvidersContract, request, {})
if (!parsed.success) return parsed.response
const { organizationId } = parsed.data.query
Expand Down
9 changes: 6 additions & 3 deletions apps/sim/app/api/chat/[identifier]/otp/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ describe('Chat OTP API Route', () => {
expect(headerSet).toHaveBeenCalledWith('Retry-After', '900')
})

it('skips IP rate limit when client IP is unknown', async () => {
it('folds spoofed `unknown` client IPs into a single shared bucket', async () => {
requestUtilsMockFns.mockGetClientIp.mockReturnValueOnce('unknown')
buildDeploymentSelect()

Expand All @@ -434,8 +434,11 @@ describe('Chat OTP API Route', () => {

await POST(request, { params: Promise.resolve({ identifier: mockIdentifier }) })

// Only the email-scoped check should run, not the IP-scoped one
expect(mockCheckRateLimitDirect).toHaveBeenCalledTimes(1)
expect(mockCheckRateLimitDirect).toHaveBeenCalledTimes(2)
expect(mockCheckRateLimitDirect).toHaveBeenCalledWith(
expect.stringMatching(/^chat-otp:ip:.*:unknown$/),
expect.any(Object)
)
expect(mockCheckRateLimitDirect).toHaveBeenCalledWith(
expect.stringContaining('chat-otp:email:'),
expect.any(Object)
Expand Down
24 changes: 11 additions & 13 deletions apps/sim/app/api/chat/[identifier]/otp/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,20 +223,18 @@ export const POST = withRouteHandler(

try {
const ip = getClientIp(request)
if (ip !== 'unknown') {
const ipRateLimit = await rateLimiter.checkRateLimitDirect(
`chat-otp:ip:${identifier}:${ip}`,
OTP_IP_RATE_LIMIT
const ipRateLimit = await rateLimiter.checkRateLimitDirect(
`chat-otp:ip:${identifier}:${ip}`,
OTP_IP_RATE_LIMIT
)
if (!ipRateLimit.allowed) {
logger.warn(`[${requestId}] OTP IP rate limit exceeded for ${identifier} from ${ip}`)
const retryAfter = Math.ceil(
(ipRateLimit.retryAfterMs ?? OTP_IP_RATE_LIMIT.refillIntervalMs) / 1000
)
if (!ipRateLimit.allowed) {
logger.warn(`[${requestId}] OTP IP rate limit exceeded for ${identifier} from ${ip}`)
const retryAfter = Math.ceil(
(ipRateLimit.retryAfterMs ?? OTP_IP_RATE_LIMIT.refillIntervalMs) / 1000
)
const response = createErrorResponse('Too many requests. Please try again later.', 429)
response.headers.set('Retry-After', String(retryAfter))
return addCorsHeaders(response, request)
}
const response = createErrorResponse('Too many requests. Please try again later.', 429)
response.headers.set('Retry-After', String(retryAfter))
return addCorsHeaders(response, request)
}

const parsed = await parseRequest(requestChatEmailOtpContract, request, context, {
Expand Down
24 changes: 11 additions & 13 deletions apps/sim/app/api/chat/[identifier]/sso/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,18 @@ export const POST = withRouteHandler(
const requestId = generateRequestId()

const ip = getClientIp(request)
if (ip !== 'unknown') {
const ipRateLimit = await rateLimiter.checkRateLimitDirect(
`chat-sso:ip:${ip}`,
SSO_IP_RATE_LIMIT
const ipRateLimit = await rateLimiter.checkRateLimitDirect(
`chat-sso:ip:${ip}`,
SSO_IP_RATE_LIMIT
)
if (!ipRateLimit.allowed) {
logger.warn(`[${requestId}] SSO eligibility rate limit exceeded from ${ip}`)
const retryAfter = Math.ceil(
(ipRateLimit.retryAfterMs ?? SSO_IP_RATE_LIMIT.refillIntervalMs) / 1000
)
if (!ipRateLimit.allowed) {
logger.warn(`[${requestId}] SSO eligibility rate limit exceeded from ${ip}`)
const retryAfter = Math.ceil(
(ipRateLimit.retryAfterMs ?? SSO_IP_RATE_LIMIT.refillIntervalMs) / 1000
)
const response = createErrorResponse('Too many requests. Please try again later.', 429)
response.headers.set('Retry-After', String(retryAfter))
return addCorsHeaders(response, request)
}
const response = createErrorResponse('Too many requests. Please try again later.', 429)
response.headers.set('Retry-After', String(retryAfter))
return addCorsHeaders(response, request)
}

const parsed = await parseRequest(chatSSOContract, request, context)
Expand Down
6 changes: 2 additions & 4 deletions apps/sim/app/api/mcp/servers/[id]/refresh/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { db } from '@sim/db'
import { mcpServers, workflow, workflowBlocks } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { and, eq, isNull } from 'drizzle-orm'
import { and, eq, inArray, isNull } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { mcpServerIdParamsSchema } from '@/lib/api/contracts/mcp'
import { validationErrorResponse } from '@/lib/api/server'
Expand Down Expand Up @@ -77,13 +77,11 @@ async function syncToolSchemasToWorkflows(
subBlocks: workflowBlocks.subBlocks,
})
.from(workflowBlocks)
.where(eq(workflowBlocks.type, 'agent'))
.where(and(eq(workflowBlocks.type, 'agent'), inArray(workflowBlocks.workflowId, workflowIds)))

const updatedWorkflowIds = new Set<string>()

for (const block of agentBlocks) {
if (!workflowIds.includes(block.workflowId)) continue

const subBlocks = block.subBlocks as Record<string, unknown> | null
if (!subBlocks) continue

Expand Down
8 changes: 4 additions & 4 deletions apps/sim/app/api/mcp/tools/stored/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { db } from '@sim/db'
import { workflow, workflowBlocks } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { eq } from 'drizzle-orm'
import { and, eq, inArray } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { withMcpAuth } from '@/lib/mcp/middleware'
Expand Down Expand Up @@ -33,13 +33,13 @@ export const GET = withRouteHandler(
const agentBlocks = await db
.select({ workflowId: workflowBlocks.workflowId, subBlocks: workflowBlocks.subBlocks })
.from(workflowBlocks)
.where(eq(workflowBlocks.type, 'agent'))
.where(
and(eq(workflowBlocks.type, 'agent'), inArray(workflowBlocks.workflowId, workflowIds))
)

const storedTools: StoredMcpTool[] = []

for (const block of agentBlocks) {
if (!workflowMap.has(block.workflowId)) continue

const subBlocks = block.subBlocks as Record<string, unknown> | null
if (!subBlocks) continue

Expand Down
8 changes: 8 additions & 0 deletions apps/sim/app/api/telemetry/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { telemetryContract } from '@/lib/api/contracts/telemetry'
import { parseRequest } from '@/lib/api/server'
import { env } from '@/lib/core/config/env'
import { isProd } from '@/lib/core/config/feature-flags'
import { enforceIpRateLimit } from '@/lib/core/rate-limiter'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'

const logger = createLogger('TelemetryAPI')
Expand Down Expand Up @@ -148,6 +149,13 @@ async function forwardToCollector(data: Record<string, unknown>): Promise<boolea
* Endpoint that receives telemetry events and forwards them to OpenTelemetry collector
*/
export const POST = withRouteHandler(async (req: NextRequest) => {
const rateLimited = await enforceIpRateLimit('telemetry', req, {
maxTokens: 60,
refillRate: 30,
refillIntervalMs: 60_000,
})
if (rateLimited) return rateLimited

try {
const parsed = await parseRequest(telemetryContract, req, {})
if (!parsed.success) return parsed.response
Expand Down
51 changes: 37 additions & 14 deletions apps/sim/app/api/templates/[id]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { type NextRequest, NextResponse } from 'next/server'
import { templateIdParamsSchema, updateTemplateContract } from '@/lib/api/contracts/templates'
import { parseRequest } from '@/lib/api/server'
import { getSession } from '@/lib/auth'
import { generateRequestId } from '@/lib/core/utils/request'
import { RateLimiter } from '@/lib/core/rate-limiter'
import { generateRequestId, getClientIp } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { canAccessTemplate } from '@/lib/templates/permissions'
import {
Expand All @@ -18,6 +19,18 @@ import type { WorkflowState } from '@/stores/workflows/workflow/types'

const logger = createLogger('TemplateByIdAPI')

const viewRateLimiter = new RateLimiter()

/**
* Per-IP, per-template view-counter dedup bucket: one increment per 10 minutes.
* Prevents scripted inflation of `templates.views` from the public GET handler.
*/
const TEMPLATE_VIEW_DEDUP = {
maxTokens: 1,
refillRate: 1,
refillIntervalMs: 10 * 60_000,
}

export const revalidate = 0

export const GET = withRouteHandler(
Expand Down Expand Up @@ -63,21 +76,31 @@ export const GET = withRouteHandler(
isStarred = starResult.length > 0
}

const shouldIncrementView = template.status === 'approved'
let shouldIncrementView = template.status === 'approved'

if (shouldIncrementView) {
try {
await db
.update(templates)
.set({
views: sql`${templates.views} + 1`,
})
.where(eq(templates.id, id))
} catch (viewError) {
logger.warn(
`[${requestId}] Failed to increment view count for template: ${id}`,
viewError
)
const viewer = session?.user?.id ?? `ip:${getClientIp(request)}`
const dedupKey = `template-view:${id}:${viewer}`
const { allowed } = await viewRateLimiter.checkRateLimitDirect(
dedupKey,
TEMPLATE_VIEW_DEDUP
)
if (!allowed) {
shouldIncrementView = false
} else {
try {
await db
.update(templates)
.set({
views: sql`${templates.views} + 1`,
})
.where(eq(templates.id, id))
} catch (viewError) {
logger.warn(
`[${requestId}] Failed to increment view count for template: ${id}`,
viewError
)
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions apps/sim/app/api/tools/a2a/cancel-task/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { createA2AClient } from '@/lib/a2a/utils'
import { a2aCancelTaskContract } from '@/lib/api/contracts/tools/a2a'
import { getValidationErrorMessage, parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { enforceUserOrIpRateLimit } from '@/lib/core/rate-limiter'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'

Expand All @@ -29,6 +30,13 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
)
}

const rateLimited = await enforceUserOrIpRateLimit(
'a2a-cancel-task',
authResult.userId,
request
)
if (rateLimited) return rateLimited

const parsed = await parseRequest(
a2aCancelTaskContract,
request,
Expand Down
8 changes: 8 additions & 0 deletions apps/sim/app/api/tools/a2a/delete-push-notification/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createA2AClient } from '@/lib/a2a/utils'
import { a2aDeletePushNotificationContract } from '@/lib/api/contracts/tools/a2a'
import { getValidationErrorMessage, parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { enforceUserOrIpRateLimit } from '@/lib/core/rate-limiter'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'

Expand All @@ -30,6 +31,13 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
)
}

const rateLimited = await enforceUserOrIpRateLimit(
'a2a-delete-push-notification',
authResult.userId,
request
)
if (rateLimited) return rateLimited

logger.info(
`[${requestId}] Authenticated A2A delete push notification request via ${authResult.authType}`,
{
Expand Down
8 changes: 8 additions & 0 deletions apps/sim/app/api/tools/a2a/get-agent-card/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createA2AClient } from '@/lib/a2a/utils'
import { a2aGetAgentCardContract } from '@/lib/api/contracts/tools/a2a'
import { getValidationErrorMessage, parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { enforceUserOrIpRateLimit } from '@/lib/core/rate-limiter'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'

Expand All @@ -28,6 +29,13 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
)
}

const rateLimited = await enforceUserOrIpRateLimit(
'a2a-get-agent-card',
authResult.userId,
request
)
if (rateLimited) return rateLimited

logger.info(
`[${requestId}] Authenticated A2A get agent card request via ${authResult.authType}`,
{
Expand Down
8 changes: 8 additions & 0 deletions apps/sim/app/api/tools/a2a/get-push-notification/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createA2AClient } from '@/lib/a2a/utils'
import { a2aGetPushNotificationContract } from '@/lib/api/contracts/tools/a2a'
import { getValidationErrorMessage, parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { enforceUserOrIpRateLimit } from '@/lib/core/rate-limiter'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'

Expand All @@ -30,6 +31,13 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
)
}

const rateLimited = await enforceUserOrIpRateLimit(
'a2a-get-push-notification',
authResult.userId,
request
)
if (rateLimited) return rateLimited

logger.info(
`[${requestId}] Authenticated A2A get push notification request via ${authResult.authType}`,
{
Expand Down
4 changes: 4 additions & 0 deletions apps/sim/app/api/tools/a2a/get-task/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { createA2AClient } from '@/lib/a2a/utils'
import { a2aGetTaskContract } from '@/lib/api/contracts/tools/a2a'
import { getValidationErrorMessage, parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { enforceUserOrIpRateLimit } from '@/lib/core/rate-limiter'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'

Expand All @@ -29,6 +30,9 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
)
}

const rateLimited = await enforceUserOrIpRateLimit('a2a-get-task', authResult.userId, request)
if (rateLimited) return rateLimited

logger.info(`[${requestId}] Authenticated A2A get task request via ${authResult.authType}`, {
userId: authResult.userId,
})
Expand Down
Loading
Loading