feat: improve recording tool usage for http gateway requests

pull/715/head
Travis Fischer 2025-06-11 10:00:50 +07:00
rodzic 4614c2322e
commit add1a43cd7
6 zmienionych plików z 166 dodań i 100 usunięć

Wyświetl plik

@ -4,16 +4,20 @@ import {
cors,
errorHandler,
init,
responseTime,
sentry
} from '@agentic/platform-hono'
import { parseToolIdentifier } from '@agentic/platform-validators'
import { Hono } from 'hono'
import type { GatewayHonoEnv } from './lib/types'
import type { GatewayHonoEnv, ResolvedOriginToolCallResult } from './lib/types'
import { createAgenticClient } from './lib/agentic-client'
import { createHttpResponseFromMcpToolCallResponse } from './lib/create-http-response-from-mcp-tool-call-response'
import { recordToolCallUsage } from './lib/record-tool-call-usage'
import { resolveHttpEdgeRequest } from './lib/resolve-http-edge-request'
import {
type ResolvedHttpEdgeRequest,
resolveHttpEdgeRequest
} from './lib/resolve-http-edge-request'
import { resolveMcpEdgeRequest } from './lib/resolve-mcp-edge-request'
import { resolveOriginToolCall } from './lib/resolve-origin-tool-call'
import { isRequestPubliclyCacheable } from './lib/utils'
@ -45,15 +49,17 @@ app.use(init)
// Wrangler does this for us. TODO: Does this happen on prod?
// app.use(accessLogger)
app.use(responseTime)
app.all(async (ctx) => {
const gatewayStartTimeMs = Date.now()
const waitUntil = ctx.executionCtx.waitUntil.bind(ctx.executionCtx)
ctx.set('cache', caches.default)
ctx.set(
'client',
createAgenticClient({
env: ctx.env,
cache: caches.default,
waitUntil: ctx.executionCtx.waitUntil.bind(ctx.executionCtx),
waitUntil,
isCachingEnabled: isRequestPubliclyCacheable(ctx.req.raw)
})
)
@ -75,73 +81,83 @@ app.all(async (ctx) => {
}).fetch(ctx.req.raw, ctx.env, executionCtx)
}
const resolvedHttpEdgeRequest = await resolveHttpEdgeRequest(ctx)
const resolvedOriginToolCallResult = await resolveOriginToolCall({
tool: resolvedHttpEdgeRequest.tool,
args: resolvedHttpEdgeRequest.toolCallArgs,
deployment: resolvedHttpEdgeRequest.deployment,
consumer: resolvedHttpEdgeRequest.consumer,
pricingPlan: resolvedHttpEdgeRequest.pricingPlan,
cacheControl: resolvedHttpEdgeRequest.cacheControl,
sessionId: ctx.get('sessionId')!,
ip: ctx.get('ip'),
env: ctx.env,
waitUntil: ctx.executionCtx.waitUntil.bind(ctx.executionCtx)
})
let resolvedHttpEdgeRequest: ResolvedHttpEdgeRequest | undefined
let resolvedOriginToolCallResult: ResolvedOriginToolCallResult | undefined
let originResponse: Response | undefined
if (resolvedOriginToolCallResult.originResponse) {
originResponse = resolvedOriginToolCallResult.originResponse
} else {
originResponse = await createHttpResponseFromMcpToolCallResponse(ctx, {
tool: resolvedHttpEdgeRequest.tool,
deployment: resolvedHttpEdgeRequest.deployment,
toolCallResponse: resolvedOriginToolCallResult.toolCallResponse
})
let res: Response | undefined
function updateResponse(response: Response) {
const res = new Response(response.body, response)
if (resolvedOriginToolCallResult) {
if (resolvedOriginToolCallResult.rateLimitResult) {
applyRateLimitHeaders({
res,
rateLimitResult: resolvedOriginToolCallResult.rateLimitResult
})
}
// Record the time it took for the origin to respond.
res.headers.set(
'x-origin-response-time',
`${resolvedOriginToolCallResult.originTimespanMs}ms`
)
}
// Reset server to Agentic because Cloudflare likes to override things
res.headers.set('server', 'agentic')
// Remove extra Cloudflare headers
res.headers.delete('x-powered-by')
res.headers.delete('via')
res.headers.delete('nel')
res.headers.delete('report-to')
res.headers.delete('server-timing')
res.headers.delete('reporting-endpoints')
return res
}
assert(originResponse, 500, 'Origin response is required')
const res = new Response(originResponse.body, originResponse)
try {
resolvedHttpEdgeRequest = await resolveHttpEdgeRequest(ctx)
if (resolvedOriginToolCallResult.rateLimitResult) {
applyRateLimitHeaders({
res,
rateLimitResult: resolvedOriginToolCallResult.rateLimitResult
resolvedOriginToolCallResult = await resolveOriginToolCall({
...resolvedHttpEdgeRequest,
args: resolvedHttpEdgeRequest.toolCallArgs,
sessionId: ctx.get('sessionId')!,
ip: ctx.get('ip'),
env: ctx.env,
waitUntil
})
if (resolvedOriginToolCallResult.originResponse) {
originResponse = resolvedOriginToolCallResult.originResponse
} else {
originResponse = await createHttpResponseFromMcpToolCallResponse(ctx, {
...resolvedHttpEdgeRequest,
toolCallResponse: resolvedOriginToolCallResult.toolCallResponse
})
}
assert(originResponse, 500, 'Origin response is required')
res = updateResponse(originResponse)
return res
} catch (err: any) {
res = updateResponse(errorHandler(err, ctx))
return res
} finally {
if (resolvedHttpEdgeRequest && res) {
recordToolCallUsage({
...resolvedHttpEdgeRequest,
requestMode: 'http',
httpResponse: res,
resolvedOriginToolCallResult,
sessionId: ctx.get('sessionId')!,
requestId: ctx.get('requestId')!,
ip: ctx.get('ip'),
env: ctx.env,
waitUntil
})
}
}
// Record the time it took for the origin to respond.
res.headers.set(
'x-origin-response-time',
`${resolvedOriginToolCallResult.originTimespanMs}ms`
)
// Record the time it took for the gateway to respond.
const gatewayTimespanMs = Date.now() - gatewayStartTimeMs
res.headers.set('x-response-time', `${gatewayTimespanMs}ms`)
recordToolCallUsage({
...resolvedHttpEdgeRequest,
requestMode: 'http',
resolvedOriginToolCallResult,
sessionId: ctx.get('sessionId')!,
requestId: ctx.get('requestId')!,
ip: ctx.get('ip'),
env: ctx.env,
waitUntil: ctx.executionCtx.waitUntil.bind(ctx.executionCtx)
})
// Reset server to Agentic because Cloudflare likes to override things
res.headers.set('server', 'agentic')
// Remove extra Cloudflare headers
res.headers.delete('x-powered-by')
res.headers.delete('via')
res.headers.delete('nel')
res.headers.delete('report-to')
res.headers.delete('server-timing')
res.headers.delete('reporting-endpoints')
return res
})

Wyświetl plik

@ -1,5 +1,5 @@
import type { AdminDeployment, PricingPlan } from '@agentic/platform-types'
import { assert, getRateLimitHeaders } from '@agentic/platform-core'
import { assert, getRateLimitHeaders, pruneEmpty } from '@agentic/platform-core'
import { parseDeploymentIdentifier } from '@agentic/platform-validators'
import { Server } from '@modelcontextprotocol/sdk/server/index.js'
import {
@ -124,9 +124,11 @@ export class DurableMcpServerBase extends McpAgent<
...resolvedToolCallResponse,
_meta: {
...resolvedToolCallResponse._meta,
...(rateLimitResult
? getRateLimitHeaders(rateLimitResult)
: undefined)
...pruneEmpty({
headers: rateLimitResult
? getRateLimitHeaders(rateLimitResult)
: undefined
})
}
}
} else {
@ -156,6 +158,7 @@ export class DurableMcpServerBase extends McpAgent<
...this.props,
requestMode: 'mcp',
tool,
mcpToolCallResponse: toolCallResponse!,
resolvedOriginToolCallResult,
sessionId,
// TODO: requestId

Wyświetl plik

@ -1,13 +1,27 @@
import type { AdminDeployment } from '@agentic/platform-types'
import type { ContentfulStatusCode } from 'hono/utils/http-status'
import { HttpError, pruneEmpty } from '@agentic/platform-core'
import {
getRateLimitHeaders,
HttpError,
pruneEmpty
} from '@agentic/platform-core'
import * as Sentry from '@sentry/cloudflare'
import { HTTPException } from 'hono/http-exception'
import { HTTPError } from 'ky'
import type { RawEnv } from './env'
import type { AdminConsumer, McpToolCallResponse } from './types'
import type {
AdminConsumer,
McpToolCallResponse,
RateLimitResult
} from './types'
/**
* Turns a thrown error into an MCP error tool call response, and attempts to
* capture as much context as possible for potential debugging.
*
* @note This function is synchronous and must never throw.
*/
export function handleMcpToolCallError(
err: any,
{
@ -16,6 +30,7 @@ export function handleMcpToolCallError(
toolName,
sessionId,
requestId,
rateLimitResult,
env
}: {
deployment: AdminDeployment
@ -23,9 +38,11 @@ export function handleMcpToolCallError(
toolName: string
sessionId: string
requestId?: string
rateLimitResult?: RateLimitResult
env: RawEnv
}
): McpToolCallResponse {
const isProd = env.ENVIRONMENT === 'production'
let message = 'Internal Server Error'
let status: ContentfulStatusCode = 500
@ -35,7 +52,10 @@ export function handleMcpToolCallError(
consumerId: consumer?.id,
toolName,
sessionId,
requestId
requestId,
headers: rateLimitResult
? getRateLimitHeaders(rateLimitResult)
: undefined
}),
isError: true,
content: [
@ -46,8 +66,6 @@ export function handleMcpToolCallError(
]
}
const isProd = env.ENVIRONMENT === 'production'
if (err instanceof HttpError) {
message = err.message
status = err.statusCode as ContentfulStatusCode
@ -78,7 +96,12 @@ export function handleMcpToolCallError(
console.error(`mcp tool call "${toolName}" error`, status, err)
if (isProd) {
Sentry.captureException(err)
try {
Sentry.captureException(err)
} catch (err_) {
// eslint-disable-next-line no-console
console.error('Error Sentry.captureException failed', err, err_)
}
}
} else {
// eslint-disable-next-line no-console

Wyświetl plik

@ -7,6 +7,7 @@ import type {
import type { RawEnv } from './env'
import type {
AdminConsumer,
McpToolCallResponse,
RequestMode,
ResolvedOriginToolCallResult,
WaitUntil
@ -35,6 +36,8 @@ export function recordToolCallUsage({
consumer,
tool,
resolvedOriginToolCallResult,
httpResponse,
mcpToolCallResponse,
ip,
sessionId,
requestId,
@ -47,19 +50,32 @@ export function recordToolCallUsage({
pricingPlan?: PricingPlan
tool?: Tool
resolvedOriginToolCallResult?: ResolvedOriginToolCallResult
httpResponse?: Response
mcpToolCallResponse?: McpToolCallResponse
ip?: string
sessionId: string
requestId?: string
env: RawEnv
waitUntil: WaitUntil
}): void {
} & (
| {
// For http requests, an http response is required.
requestMode: 'http'
httpResponse: Response
mcpToolCallResponse?: never
}
| {
// For mcp cool call requests, an mcp tool call response is required.
requestMode: 'mcp'
httpResponse?: never
mcpToolCallResponse: McpToolCallResponse
}
)): void {
const { projectId } = deployment
const {
rateLimitResult,
cacheStatus,
originResponse,
originTimespanMs,
toolCallResponse,
toolCallArgs,
numRequestsCost,
reportUsage
@ -67,18 +83,16 @@ export function recordToolCallUsage({
numRequestsCost: 0,
reportUsage: false
}
mcpToolCallResponse ??= resolvedOriginToolCallResult?.toolCallResponse
const requestSize = resolvedOriginToolCallResult
? JSON.stringify(toolCallArgs).length
: 0
const responseSize = resolvedOriginToolCallResult
? Number.parseInt(originResponse?.headers.get('content-length') ?? '0') ||
JSON.stringify(toolCallResponse).length
: 0
const requestSize = toolCallArgs ? JSON.stringify(toolCallArgs).length : 0
const responseSize =
Number.parseInt(httpResponse?.headers.get('content-length') ?? '0') ||
(mcpToolCallResponse ? JSON.stringify(mcpToolCallResponse).length : 0)
// The string dimensions used for grouping and filtering (sometimes called
// labels in other metrics systems).
// NOTE: It is important that the ordering of these fields remains consistent!
// NOTE: The ordering of these fields is important and must remain consistent!
// Max of 20 blobs with total size <= 5120 bytes.
const blobs = [
// Project ID of the request
@ -106,24 +120,27 @@ export function recordToolCallUsage({
consumer?.stripeStatus ?? null,
// Whether the request was rate-limited
resolvedOriginToolCallResult
? rateLimitResult?.passed
rateLimitResult
? rateLimitResult.passed
? 'rl-passed'
: 'rl-exceeded'
: null,
: mcpToolCallResponse?._meta?.status === 429
? 'rl-exceeded'
: null,
// Whether the request hit the cache
cacheStatus ?? null,
// Response status
resolvedOriginToolCallResult
? originResponse?.status?.toString() ||
(toolCallResponse ? (toolCallResponse.isError ? 'error' : '200') : null)
: 'error'
// HTTP response status
httpResponse?.status?.toString() ||
(mcpToolCallResponse
? mcpToolCallResponse._meta?.status?.toString() ||
(mcpToolCallResponse?.isError ? 'error' : '200')
: 'error')
]
// Numberic values to record in this data point.
// NOTE: It is important that the ordering of these fields remains consistent!
// NOTE: The ordering of these fields is important and must remain consistent!
const doubles = [
// Origin timespan in milliseconds
originTimespanMs ?? 0,

Wyświetl plik

@ -15,6 +15,8 @@ import {
/**
* Hono error handler that sanitizes all types of internal, http, json-rpc, and
* unexpected errors and responds with an appropate HTTP Response.
*
* @note This function is synchronous and must never throw.
*/
export function errorHandler(
err: Error | HTTPResponseError,
@ -61,7 +63,12 @@ export function errorHandler(
logger.error(status, err)
if (isProd) {
captureException(err)
try {
captureException(err)
} catch (err_) {
// eslint-disable-next-line no-console
console.error('Error Sentry.captureException failed', err, err_)
}
}
} else {
logger.warn(status, message, err)

Wyświetl plik

@ -29,15 +29,15 @@
- raw
- auth
- custom auth pages for `openauth`
- API gateway
- **API gateway**
- **usage tracking and reporting**
- oauth flow
- https://docs.scalekit.com/guides/mcp/oauth
- openapi-kitchen-sink
- mcp-kitchen-sink
- how to handle binary bodies and responses?
- `recordToolCallUsage` in `finally` block of http flow
- improve logger vs console for non-hono path and util methods
- extra `Sentry` instrumentation (`setUser`, `captureMessage`, etc)
- **Public MCP server interface**
- how does oauth work with this flow?
- proper error handling support within this flow; will currently get generic errors