feat: record tool call usage for mcp edge requests; add graceful error handling to mcp edge tool call requests

pull/715/head
Travis Fischer 2025-06-11 09:02:51 +07:00
rodzic f92e448eb2
commit afc8e49044
14 zmienionych plików z 316 dodań i 116 usunięć

Wyświetl plik

@ -37,6 +37,7 @@
"@agentic/platform-validators": "workspace:*",
"@hono/zod-validator": "catalog:",
"@modelcontextprotocol/sdk": "catalog:",
"@sentry/cloudflare": "catalog:",
"agents": "^0.0.95",
"fast-content-type-parse": "catalog:",
"hono": "catalog:",

Wyświetl plik

@ -4,7 +4,6 @@ import {
cors,
errorHandler,
init,
responseTime,
sentry
} from '@agentic/platform-hono'
import { parseToolIdentifier } from '@agentic/platform-validators'
@ -13,7 +12,7 @@ import { Hono } from 'hono'
import type { GatewayHonoEnv } from './lib/types'
import { createAgenticClient } from './lib/agentic-client'
import { createHttpResponseFromMcpToolCallResponse } from './lib/create-http-response-from-mcp-tool-call-response'
import { reportToolCallUsage } from './lib/report-tool-call-usage'
import { recordToolCallUsage } from './lib/record-tool-call-usage'
import { resolveHttpEdgeRequest } from './lib/resolve-http-edge-request'
import { resolveMcpEdgeRequest } from './lib/resolve-mcp-edge-request'
import { resolveOriginToolCall } from './lib/resolve-origin-tool-call'
@ -46,8 +45,6 @@ app.use(init)
// Wrangler does this for us. TODO: Does this happen on prod?
// app.use(accessLogger)
app.use(responseTime)
app.all(async (ctx) => {
const gatewayStartTimeMs = Date.now()
ctx.set('cache', caches.default)
@ -80,8 +77,6 @@ app.all(async (ctx) => {
const resolvedHttpEdgeRequest = await resolveHttpEdgeRequest(ctx)
const originStartTimeMs = Date.now()
const resolvedOriginToolCallResult = await resolveOriginToolCall({
tool: resolvedHttpEdgeRequest.tool,
args: resolvedHttpEdgeRequest.toolCallArgs,
@ -117,22 +112,22 @@ app.all(async (ctx) => {
}
// Record the time it took for the origin to respond.
const now = Date.now()
const originTimespanMs = now - originStartTimeMs
res.headers.set('x-origin-response-time', `${originTimespanMs}ms`)
res.headers.set(
'x-origin-response-time',
`${resolvedOriginToolCallResult.originTimespanMs}ms`
)
const gatewayTimespanMs = now - gatewayStartTimeMs
// Record the time it took for the gateway to respond.
const gatewayTimespanMs = Date.now() - gatewayStartTimeMs
res.headers.set('x-response-time', `${gatewayTimespanMs}ms`)
reportToolCallUsage({
recordToolCallUsage({
...resolvedHttpEdgeRequest,
requestMode: 'http',
resolvedOriginToolCallResult,
sessionId: ctx.get('sessionId')!,
requestId: ctx.get('requestId')!,
ip: ctx.get('ip'),
originTimespanMs,
gatewayTimespanMs,
env: ctx.env,
waitUntil: ctx.executionCtx.waitUntil.bind(ctx.executionCtx)
})

Wyświetl plik

@ -1,8 +1,10 @@
import { assert } from '@agentic/platform-core'
import { Client as McpClient } from '@modelcontextprotocol/sdk/client/index.js'
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
import * as Sentry from '@sentry/cloudflare'
import { DurableObject } from 'cloudflare:workers'
import type { RawEnv } from './env'
import type { AgenticMcpRequestMetadata } from './types'
export type DurableMcpClientInfo = {
@ -16,7 +18,7 @@ export type DurableMcpClientInfo = {
// customer<>DurableMcpClientInfo connection?
// Currently using `sessionId`
export class DurableMcpClient extends DurableObject {
export class DurableMcpClientBase extends DurableObject<RawEnv> {
protected client?: McpClient
protected clientConnectionP?: Promise<void>
@ -82,3 +84,12 @@ export class DurableMcpClient extends DurableObject {
return JSON.stringify(toolCallResponse)
}
}
export const DurableMcpClient = Sentry.instrumentDurableObjectWithSentry(
(env: RawEnv) => ({
dsn: env.SENTRY_DSN,
environment: env.ENVIRONMENT,
integrations: [Sentry.extraErrorDataIntegration()]
}),
DurableMcpClientBase
)

Wyświetl plik

@ -6,14 +6,21 @@ import {
CallToolRequestSchema,
ListToolsRequestSchema
} from '@modelcontextprotocol/sdk/types.js'
import * as Sentry from '@sentry/cloudflare'
import { McpAgent } from 'agents/mcp'
import type { RawEnv } from './env'
import type { AdminConsumer } from './types'
import type {
AdminConsumer,
McpToolCallResponse,
ResolvedOriginToolCallResult
} from './types'
import { handleMcpToolCallError } from './handle-mcp-tool-call-error'
import { recordToolCallUsage } from './record-tool-call-usage'
import { resolveOriginToolCall } from './resolve-origin-tool-call'
import { transformHttpResponseToMcpToolCallResponse } from './transform-http-response-to-mcp-tool-call-response'
export class DurableMcpServer extends McpAgent<
export class DurableMcpServerBase extends McpAgent<
RawEnv,
never, // TODO: do we need local state?
{
@ -26,6 +33,11 @@ export class DurableMcpServer extends McpAgent<
protected _serverP = Promise.withResolvers<Server>()
override server = this._serverP.promise
// NOTE: This empty constructor is required for the Sentry wrapper to work.
public constructor(state: DurableObjectState, env: RawEnv) {
super(state, env)
}
override async init() {
const { consumer, deployment, pricingPlan, ip } = this.props
const { projectIdentifier } = parseDeploymentIdentifier(
@ -73,22 +85,17 @@ export class DurableMcpServer extends McpAgent<
}))
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params
const tool = tools.find((tool) => tool.name === name)
const { name: toolName, arguments: args } = request.params
const sessionId = this.ctx.id.toString()
const tool = tools.find((tool) => tool.name === toolName)
let resolvedOriginToolCallResult: ResolvedOriginToolCallResult | undefined
let toolCallResponse: McpToolCallResponse | undefined
try {
assert(tool, 404, `Unknown tool "${name}"`)
assert(tool, 404, `Unknown tool "${toolName}"`)
// TODO: usage tracking / reporting
const sessionId = this.ctx.id.toString()
const {
toolCallArgs,
originRequest,
originResponse,
toolCallResponse,
rateLimitResult
} = await resolveOriginToolCall({
resolvedOriginToolCallResult = await resolveOriginToolCall({
tool,
args,
deployment,
@ -100,39 +107,72 @@ export class DurableMcpServer extends McpAgent<
waitUntil: this.ctx.waitUntil.bind(this.ctx)
})
const {
originResponse,
toolCallResponse: resolvedToolCallResponse,
rateLimitResult
} = resolvedOriginToolCallResult
if (originResponse) {
return transformHttpResponseToMcpToolCallResponse({
originRequest,
originResponse,
toolCallResponse = await transformHttpResponseToMcpToolCallResponse({
tool,
toolCallArgs,
rateLimitResult
...resolvedOriginToolCallResult
})
} else if (toolCallResponse) {
if (toolCallResponse._meta || rateLimitResult) {
return {
...toolCallResponse,
} else if (resolvedToolCallResponse) {
if (resolvedToolCallResponse._meta || rateLimitResult) {
toolCallResponse = {
...resolvedToolCallResponse,
_meta: {
...toolCallResponse._meta,
...resolvedToolCallResponse._meta,
...(rateLimitResult
? getRateLimitHeaders(rateLimitResult)
: undefined)
}
}
} else {
return toolCallResponse
toolCallResponse = resolvedToolCallResponse
}
} else {
assert(false, 500)
}
assert(toolCallResponse, 500, 'Missing tool call response')
return toolCallResponse
} catch (err: unknown) {
// TODO: handle errors
// eslint-disable-next-line no-console
console.error(err)
throw err
// Gracefully handle tool call exceptions, whether they're thrown by the
// origin or internally by the gateway.
toolCallResponse = handleMcpToolCallError(err, {
deployment,
consumer,
toolName,
sessionId,
env: this.env
})
return toolCallResponse
} finally {
// TODO: report usage
// Record tool call usage, whether the call was successful or not.
recordToolCallUsage({
...this.props,
requestMode: 'mcp',
tool,
resolvedOriginToolCallResult,
sessionId,
// TODO: requestId
ip,
env: this.env,
waitUntil: this.ctx.waitUntil.bind(this.ctx)
})
}
})
}
}
export const DurableMcpServer = Sentry.instrumentDurableObjectWithSentry(
(env: RawEnv) => ({
dsn: env.SENTRY_DSN,
environment: env.ENVIRONMENT,
integrations: [Sentry.extraErrorDataIntegration()]
}),
DurableMcpServerBase
)

Wyświetl plik

@ -10,9 +10,6 @@ import {
} from '@agentic/platform-hono'
import { z } from 'zod'
import type { DurableMcpClient } from './durable-mcp-client'
import type { DurableRateLimiter } from './rate-limits/durable-rate-limiter'
export const envSchema = baseEnvSchema
.extend({
AGENTIC_API_BASE_URL: z.string().url(),
@ -20,15 +17,15 @@ export const envSchema = baseEnvSchema
STRIPE_SECRET_KEY: z.string().nonempty(),
DO_RATE_LIMITER: z.custom<DurableObjectNamespace<DurableRateLimiter>>(
(ns) => isDurableObjectNamespace(ns)
DO_RATE_LIMITER: z.custom<DurableObjectNamespace>((ns) =>
isDurableObjectNamespace(ns)
),
DO_MCP_SERVER: z.custom<DurableObjectNamespace>((ns) =>
isDurableObjectNamespace(ns)
),
DO_MCP_CLIENT: z.custom<DurableObjectNamespace<DurableMcpClient>>((ns) =>
DO_MCP_CLIENT: z.custom<DurableObjectNamespace>((ns) =>
isDurableObjectNamespace(ns)
),

Wyświetl plik

@ -0,0 +1,97 @@
import type { AdminDeployment } from '@agentic/platform-types'
import type { ContentfulStatusCode } from 'hono/utils/http-status'
import { HttpError, pruneEmpty } from '@agentic/platform-core'
import * as Sentry from '@sentry/cloudflare'
import { HTTPException } from 'hono/http-exception'
import { HTTPError } from 'ky'
import type { RawEnv } from './env'
import type { AdminConsumer, McpToolCallResponse } from './types'
export function handleMcpToolCallError(
err: any,
{
deployment,
consumer,
toolName,
sessionId,
requestId,
env
}: {
deployment: AdminDeployment
consumer?: AdminConsumer
toolName: string
sessionId: string
requestId?: string
env: RawEnv
}
): McpToolCallResponse {
let message = 'Internal Server Error'
let status: ContentfulStatusCode = 500
const res: McpToolCallResponse = {
_meta: pruneEmpty({
deploymentId: deployment.id,
consumerId: consumer?.id,
toolName,
sessionId,
requestId
}),
isError: true,
content: [
{
type: 'text',
text: message
}
]
}
const isProd = env.ENVIRONMENT === 'production'
if (err instanceof HttpError) {
message = err.message
status = err.statusCode as ContentfulStatusCode
// This is where rate-limit headers will be set, since `RateLimitError`
// is a subclass of `HttpError`.
if (err.headers) {
for (const [key, value] of Object.entries(err.headers)) {
res._meta![key] = value
}
}
} else if (err instanceof HTTPException) {
message = err.message
status = err.status
} else if (err instanceof HTTPError) {
message = err.message
status = err.response.status as ContentfulStatusCode
} else if (!isProd && err.message) {
message = err.message
}
if (!Number.isSafeInteger(status)) {
status = 500
}
if (status === 500) {
// eslint-disable-next-line no-console
console.error(`mcp tool call "${toolName}" error`, status, err)
if (isProd) {
Sentry.captureException(err)
}
} else {
// eslint-disable-next-line no-console
console.warn(`mcp tool call "${toolName}" warning`, status, message, err)
}
res._meta!.status = status
res.content = [
{
type: 'text',
text: message
}
]
return res
}

Wyświetl plik

@ -1,13 +1,15 @@
import type { SetOptional } from 'type-fest'
import * as Sentry from '@sentry/cloudflare'
import { DurableObject } from 'cloudflare:workers'
import type { RawEnv } from '../env'
import type { RateLimitState } from '../types'
const initialState: SetOptional<RateLimitState, 'resetTimeMs'> = {
current: 0
}
export class DurableRateLimiter extends DurableObject {
export class DurableRateLimiterBase extends DurableObject<RawEnv> {
async update({
intervalMs,
cost = 1
@ -44,3 +46,12 @@ export class DurableRateLimiter extends DurableObject {
await this.reset()
}
}
export const DurableRateLimiter = Sentry.instrumentDurableObjectWithSentry(
(env: RawEnv) => ({
dsn: env.SENTRY_DSN,
environment: env.ENVIRONMENT,
integrations: [Sentry.extraErrorDataIntegration()]
}),
DurableRateLimiterBase
)

Wyświetl plik

@ -7,6 +7,7 @@ import type {
RateLimitState,
WaitUntil
} from '../types'
import type { DurableRateLimiterBase } from './durable-rate-limiter'
/**
* This maps persists across worker executions and is used for caching active
@ -98,10 +99,12 @@ export async function enforceRateLimit({
}
}
const did = env.DO_RATE_LIMITER.idFromName(id)
const obj = env.DO_RATE_LIMITER.get(did)
const durableRateLimiterId = env.DO_RATE_LIMITER.idFromName(id)
const durableRateLimiter = env.DO_RATE_LIMITER.get(
durableRateLimiterId
) as DurableObjectStub<DurableRateLimiterBase>
const updatedRateLimitStateP = obj.update({ cost, intervalMs })
const updatedRateLimitStateP = durableRateLimiter.update({ cost, intervalMs })
if (async) {
waitUntil(

Wyświetl plik

@ -29,48 +29,52 @@ import { createStripe } from './external/stripe'
*
* @see https://developers.cloudflare.com/analytics/analytics-engine/limits/
*/
export function reportToolCallUsage({
export function recordToolCallUsage({
requestMode,
tool,
deployment,
consumer,
tool,
resolvedOriginToolCallResult,
ip,
sessionId,
originTimespanMs,
gatewayTimespanMs,
requestId,
env,
waitUntil
}: {
requestMode: RequestMode
tool: Tool
deployment: AdminDeployment
consumer?: AdminConsumer
pricingPlan?: PricingPlan
resolvedOriginToolCallResult: ResolvedOriginToolCallResult
tool?: Tool
resolvedOriginToolCallResult?: ResolvedOriginToolCallResult
ip?: string
sessionId: string
requestId: string
originTimespanMs: number
gatewayTimespanMs: number
requestId?: string
env: RawEnv
waitUntil: WaitUntil
}): void {
const { projectId } = deployment
const {
rateLimitResult,
cacheStatus,
originResponse,
originTimespanMs,
toolCallResponse,
toolCallArgs,
numRequestsCost,
reportUsage
} = resolvedOriginToolCallResult
const { projectId } = deployment
} = resolvedOriginToolCallResult ?? {
numRequestsCost: 0,
reportUsage: false
}
const requestSize = JSON.stringify(toolCallArgs).length
const responseSize =
Number.parseInt(originResponse?.headers.get('content-length') ?? '0') ||
JSON.stringify(toolCallResponse).length
const requestSize = resolvedOriginToolCallResult
? JSON.stringify(toolCallArgs).length
: 0
const responseSize = resolvedOriginToolCallResult
? Number.parseInt(originResponse?.headers.get('content-length') ?? '0') ||
JSON.stringify(toolCallResponse).length
: 0
// The string dimensions used for grouping and filtering (sometimes called
// labels in other metrics systems).
@ -84,7 +88,7 @@ export function reportToolCallUsage({
deployment.id,
// Name of the tool that was called
tool.name,
tool?.name ?? null,
// Whether this request was made via MCP or HTTP
requestMode,
@ -102,24 +106,27 @@ export function reportToolCallUsage({
consumer?.stripeStatus ?? null,
// Whether the request was rate-limited
rateLimitResult?.passed ? 'rl-passed' : 'rl-exceeded',
resolvedOriginToolCallResult
? rateLimitResult?.passed
? 'rl-passed'
: 'rl-exceeded'
: null,
// Whether the request hit the cache
cacheStatus,
cacheStatus ?? null,
// Response status
originResponse?.status?.toString() ||
(toolCallResponse ? (toolCallResponse.isError ? 'error' : '200') : null)
resolvedOriginToolCallResult
? originResponse?.status?.toString() ||
(toolCallResponse ? (toolCallResponse.isError ? 'error' : '200') : null)
: 'error'
]
// Numberic values to record in this data point.
// NOTE: It is important that the ordering of these fields remains consistent!
const doubles = [
// Origin timespan in milliseconds
originTimespanMs,
// Gateway timespan in milliseconds
gatewayTimespanMs,
originTimespanMs ?? 0,
// Request bandwidth in bytes
requestSize,
@ -129,7 +136,10 @@ export function reportToolCallUsage({
// Total bandwidth in bytes
// TODO: Correctly calculate total bandwidth using `content-length`
requestSize + responseSize
requestSize + responseSize,
// Number of requests cost
numRequestsCost ?? 0
]
// Cloudflare Analytics Engine only supports writing a single index at a time,
@ -162,7 +172,9 @@ export function reportToolCallUsage({
const pricingPlanLineItemSlug = 'requests'
const eventName = `meter-${projectId}-${pricingPlanLineItemSlug}`
const identifier = `${requestId}:${consumer.id}:${tool.name}`
const identifier = requestId
? `${requestId}:${consumer.id}:${tool?.name || 'unknown-tool'}`
: undefined
// Report usage to Stripe asynchronously.
waitUntil(
@ -170,7 +182,7 @@ export function reportToolCallUsage({
event_name: eventName,
identifier,
payload: {
value: '1',
value: numRequestsCost.toString(),
stripe_customer_id: consumer._stripeCustomerId
}
})

Wyświetl plik

@ -4,9 +4,11 @@ import type {
RateLimit,
Tool
} from '@agentic/platform-types'
import type { DurableObjectStub } from '@cloudflare/workers-types'
import { assert, RateLimitError } from '@agentic/platform-core'
import { parseDeploymentIdentifier } from '@agentic/platform-validators'
import type { DurableMcpClientBase } from './durable-mcp-client'
import type { RawEnv } from './env'
import type {
AdminConsumer,
@ -54,6 +56,8 @@ export async function resolveOriginToolCall({
// be rate-limited / cached / tracked / etc.
const { originAdapter } = deployment
// TODO: make this configurable via `ToolConfig.cost`
const numRequestsCost = 1
let rateLimitResult: RateLimitResult | undefined
let rateLimit: RateLimit | undefined | null
let reportUsage = true
@ -143,11 +147,14 @@ export async function resolveOriginToolCall({
}
if (rateLimit) {
// TODO: Consider decrementing rate limit if the response is cached or
// errors? this doesn't seem too important, so will leave as-is for now.
rateLimitResult = await enforceRateLimit({
id: consumer?.id ?? ip ?? sessionId,
interval: rateLimit.interval,
maxPerInterval: rateLimit.maxPerInterval,
async: rateLimit.async,
cost: numRequestsCost,
env,
waitUntil
})
@ -169,6 +176,8 @@ export async function resolveOriginToolCall({
strictAdditionalProperties: true
})
const originStartTimeMs = Date.now()
if (originAdapter.type === 'openapi') {
const operation = originAdapter.toolToOperationMap[tool.name]
assert(operation, 404, `Tool "${tool.name}" not found in OpenAPI spec`)
@ -201,7 +210,9 @@ export async function resolveOriginToolCall({
rateLimitResult,
toolCallArgs,
originRequest,
originResponse
originResponse,
originTimespanMs: Date.now() - originStartTimeMs,
numRequestsCost
}
} else if (originAdapter.type === 'mcp') {
const { projectIdentifier } = parseDeploymentIdentifier(
@ -210,7 +221,9 @@ export async function resolveOriginToolCall({
)
const id = env.DO_MCP_CLIENT.idFromName(sessionId)
const originMcpClient = env.DO_MCP_CLIENT.get(id)
const originMcpClient = env.DO_MCP_CLIENT.get(
id
) as DurableObjectStub<DurableMcpClientBase>
await originMcpClient.init({
url: deployment.originUrl,
@ -263,7 +276,9 @@ export async function resolveOriginToolCall({
reportUsage,
rateLimitResult,
toolCallArgs,
toolCallResponse: (await response.json()) as McpToolCallResponse
toolCallResponse: (await response.json()) as McpToolCallResponse,
originTimespanMs: Date.now() - originStartTimeMs,
numRequestsCost
}
}
}
@ -294,10 +309,16 @@ export async function resolveOriginToolCall({
reportUsage,
rateLimitResult,
toolCallArgs,
toolCallResponse
toolCallResponse,
originTimespanMs: Date.now() - originStartTimeMs,
numRequestsCost
}
} else {
assert(false, 500)
assert(
false,
500,
`Internal error: origin adapter type "${(originAdapter as any).type}"`
)
}
}
}

Wyświetl plik

@ -67,6 +67,8 @@ export type ResolvedOriginToolCallResult = {
rateLimitResult?: RateLimitResult
cacheStatus: CacheStatus
reportUsage: boolean
originTimespanMs: number
numRequestsCost: number
} & (
| {
originRequest: Request

Wyświetl plik

@ -1,8 +1,7 @@
// import { parseToolIdentifier } from '@agentic/platform-validators'
import * as Sentry from '@sentry/cloudflare'
import { app } from './app'
// import { DurableMcpServer } from './lib/durable-mcp-server'
import { type Env, parseEnv } from './lib/env'
import { type Env, parseEnv, type RawEnv } from './lib/env'
// Export Durable Objects for cloudflare
export { DurableMcpClient } from './lib/durable-mcp-client'
@ -10,33 +9,40 @@ export { DurableMcpServer } from './lib/durable-mcp-server'
export { DurableRateLimiter } from './lib/rate-limits/durable-rate-limiter'
// Main worker entrypoint
export default {
async fetch(
request: Request,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
let parsedEnv: Env
export default Sentry.withSentry(
(env: RawEnv) => ({
dsn: env.SENTRY_DSN,
environment: env.ENVIRONMENT,
integrations: [Sentry.extraErrorDataIntegration()]
}),
{
async fetch(
request: Request,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
let parsedEnv: Env
// Validate the environment
try {
parsedEnv = parseEnv(env)
} catch (err: any) {
// eslint-disable-next-line no-console
console.error('error api gateway invalid env:', err.message)
// Validate the environment
try {
parsedEnv = parseEnv(env)
} catch (err: any) {
// eslint-disable-next-line no-console
console.error('error api gateway invalid env:', err.message)
return new Response(
JSON.stringify({ error: 'Invalid api gateway environment' }),
{
status: 500,
headers: {
'content-type': 'application/json'
return new Response(
JSON.stringify({ error: 'Invalid api gateway environment' }),
{
status: 500,
headers: {
'content-type': 'application/json'
}
}
}
)
}
)
}
// Handle the request with `hono`
return app.fetch(request, parsedEnv, ctx)
}
} satisfies ExportedHandler<Env>
// Handle the request with `hono`
return app.fetch(request, parsedEnv, ctx)
}
} satisfies ExportedHandler<Env>
)

Wyświetl plik

@ -433,6 +433,9 @@ importers:
'@modelcontextprotocol/sdk':
specifier: 'catalog:'
version: 1.12.1
'@sentry/cloudflare':
specifier: 'catalog:'
version: 9.26.0(@cloudflare/workers-types@4.20250604.0)
agents:
specifier: ^0.0.95
version: 0.0.95(@cloudflare/workers-types@4.20250604.0)(react@19.1.0)

Wyświetl plik

@ -76,6 +76,7 @@
- signed requests
- add support for custom headers on responses
- add ability to only report stripe usage on non-cached requests
- add support for ToolConfig.cost defaulting to 1, to easily support tools which cost multiple "credits"
- `@agentic/platform-hono`
- fix sentry middleware
- https://github.com/honojs/middleware/blob/main/packages/sentry/src/index.ts