feat: clean up gateway resolving edge request logic

pull/715/head
Travis Fischer 2025-06-13 07:00:05 +07:00
rodzic b066de9c5f
commit b61641240e
10 zmienionych plików z 202 dodań i 188 usunięć

Wyświetl plik

@ -37,6 +37,7 @@
"@agentic/platform-hono": "workspace:*",
"@agentic/platform-types": "workspace:*",
"@agentic/platform-validators": "workspace:*",
"@cloudflare/workers-oauth-provider": "^0.0.5",
"@hono/zod-validator": "catalog:",
"@modelcontextprotocol/sdk": "catalog:",
"@sentry/cloudflare": "catalog:",

Wyświetl plik

@ -7,17 +7,18 @@ import {
responseTime,
sentry
} from '@agentic/platform-hono'
import { parseToolIdentifier } from '@agentic/platform-validators'
import { Hono } from 'hono'
import type { GatewayHonoEnv, ResolvedOriginToolCallResult } from './lib/types'
import type {
GatewayHonoEnv,
ResolvedHttpEdgeRequest,
ResolvedOriginToolCallResult
} from './lib/types'
import { createAgenticClient } from './lib/agentic-client'
import { createHttpResponseFromMcpToolCallResponse } from './lib/create-http-response-from-mcp-tool-call-response'
import { recordToolCallUsage } from './lib/record-tool-call-usage'
import {
type ResolvedHttpEdgeRequest,
resolveHttpEdgeRequest
} from './lib/resolve-http-edge-request'
import { resolveEdgeRequest } from './lib/resolve-edge-request'
import { resolveHttpEdgeRequest } from './lib/resolve-http-edge-request'
import { resolveMcpEdgeRequest } from './lib/resolve-mcp-edge-request'
import { resolveOriginToolCall } from './lib/resolve-origin-tool-call'
import { isRequestPubliclyCacheable } from './lib/utils'
@ -65,21 +66,17 @@ app.all(async (ctx) => {
})
)
// TODO: Clean up the duplication between this block,
// `resolveMcpEdgeRequest`, and `resolveHttpEdgeRequest`.
const requestUrl = new URL(ctx.req.url)
const { pathname } = requestUrl
const requestedToolIdentifier = pathname.replace(/^\//, '').replace(/\/$/, '')
const { toolName } = parseToolIdentifier(requestedToolIdentifier)
const resolvedEdgeRequest = await resolveEdgeRequest(ctx)
const { toolName } = resolvedEdgeRequest.parsedToolIdentifier
// Handle MCP requests
if (toolName === 'mcp') {
ctx.set('isJsonRpcRequest', true)
const executionCtx = ctx.executionCtx as any
const mcpInfo = await resolveMcpEdgeRequest(ctx)
const mcpInfo = await resolveMcpEdgeRequest(ctx, resolvedEdgeRequest)
executionCtx.props = mcpInfo
return DurableMcpServer.serve(pathname, {
return DurableMcpServer.serve('/*', {
binding: 'DO_MCP_SERVER'
}).fetch(ctx.req.raw, ctx.env, executionCtx)
}
@ -92,14 +89,16 @@ app.all(async (ctx) => {
try {
// Resolve the http edge request to a specific deployment, consumer, and
// tool call.
resolvedHttpEdgeRequest = await resolveHttpEdgeRequest(ctx)
resolvedHttpEdgeRequest = await resolveHttpEdgeRequest(
ctx,
resolvedEdgeRequest
)
// Invoke the origin tool call.
resolvedOriginToolCallResult = await resolveOriginToolCall({
...resolvedHttpEdgeRequest,
args: resolvedHttpEdgeRequest.toolCallArgs,
sessionId: ctx.get('sessionId')!,
ip: ctx.get('ip'),
env: ctx.env,
waitUntil
})
@ -129,12 +128,10 @@ app.all(async (ctx) => {
if (resolvedHttpEdgeRequest && res) {
recordToolCallUsage({
...resolvedHttpEdgeRequest,
requestMode: 'http',
edgeRequestMode: 'http',
httpResponse: res,
resolvedOriginToolCallResult,
sessionId: ctx.get('sessionId')!,
requestId: ctx.get('requestId')!,
ip: ctx.get('ip'),
env: ctx.env,
waitUntil
})

Wyświetl plik

@ -1,4 +1,3 @@
import type { AdminDeployment, PricingPlan } from '@agentic/platform-types'
import { assert, getRateLimitHeaders } from '@agentic/platform-core'
import { parseDeploymentIdentifier } from '@agentic/platform-validators'
import { Server } from '@modelcontextprotocol/sdk/server/index.js'
@ -11,8 +10,8 @@ import { McpAgent } from 'agents/mcp'
import type { RawEnv } from './env'
import type {
AdminConsumer,
McpToolCallResponse,
ResolvedMcpEdgeRequest,
ResolvedOriginToolCallResult
} from './types'
import { handleMcpToolCallError } from './handle-mcp-tool-call-error'
@ -23,13 +22,8 @@ import { createAgenticMcpMetadata } from './utils'
export class DurableMcpServerBase extends McpAgent<
RawEnv,
never, // TODO: do we need local state?
{
deployment: AdminDeployment
consumer?: AdminConsumer
pricingPlan?: PricingPlan
ip?: string
}
never, // We aren't currently using local state, so set it to `never`.
ResolvedMcpEdgeRequest
> {
protected _serverP = Promise.withResolvers<Server>()
override server = this._serverP.promise
@ -40,7 +34,7 @@ export class DurableMcpServerBase extends McpAgent<
}
override async init() {
const { consumer, deployment, pricingPlan, ip } = this.props
const { consumer, deployment, pricingPlan } = this.props
const { projectIdentifier } = parseDeploymentIdentifier(
deployment.identifier
)
@ -62,20 +56,17 @@ export class DurableMcpServerBase extends McpAgent<
)
if (toolConfig) {
const pricingPlanToolConfig = pricingPlan
const pricingPlanToolOverride = pricingPlan
? toolConfig.pricingPlanOverridesMap?.[pricingPlan.slug]
: undefined
if (pricingPlanToolConfig?.enabled === false) {
// Tool is disabled / hidden for the customer's current pricing plan
if (pricingPlanToolOverride?.enabled === true) {
// Tool is explicitly enabled for the customer's pricing plan
} else if (pricingPlanToolOverride?.enabled === false) {
// Tool is disabled for the customer's pricing plan
return undefined
}
if (
pricingPlanToolConfig?.enabled !== true &&
toolConfig.enabled === false
) {
// Tool is disabled / hidden for all pricing plans
} else if (toolConfig.enabled === false) {
// Tool is disabled for all pricing plans
return undefined
}
}
@ -101,15 +92,12 @@ export class DurableMcpServerBase extends McpAgent<
assert(tool, 404, `Unknown tool "${toolName}"`)
resolvedOriginToolCallResult = await resolveOriginToolCall({
...this.props,
tool,
args,
deployment,
consumer,
pricingPlan,
cacheControl,
sessionId,
env: this.env,
ip,
waitUntil: this.ctx.waitUntil.bind(this.ctx)
})
@ -155,13 +143,11 @@ export class DurableMcpServerBase extends McpAgent<
// Record tool call usage, whether the call was successful or not.
recordToolCallUsage({
...this.props,
requestMode: 'mcp',
edgeRequestMode: 'mcp',
tool,
mcpToolCallResponse: toolCallResponse!,
resolvedOriginToolCallResult,
sessionId,
// TODO: requestId
ip,
env: this.env,
waitUntil: this.ctx.waitUntil.bind(this.ctx)
})

Wyświetl plik

@ -7,8 +7,8 @@ import type {
import type { RawEnv } from './env'
import type {
AdminConsumer,
EdgeRequestMode,
McpToolCallResponse,
RequestMode,
ResolvedOriginToolCallResult,
WaitUntil
} from './types'
@ -31,7 +31,7 @@ import { createStripe } from './external/stripe'
* @see https://developers.cloudflare.com/analytics/analytics-engine/limits/
*/
export function recordToolCallUsage({
requestMode,
edgeRequestMode,
deployment,
consumer,
tool,
@ -44,7 +44,7 @@ export function recordToolCallUsage({
env,
waitUntil
}: {
requestMode: RequestMode
edgeRequestMode: EdgeRequestMode
deployment: AdminDeployment
consumer?: AdminConsumer
pricingPlan?: PricingPlan
@ -60,13 +60,13 @@ export function recordToolCallUsage({
} & (
| {
// For http requests, an http response is required.
requestMode: 'http'
edgeRequestMode: 'http'
httpResponse: Response
mcpToolCallResponse?: never
}
| {
// For mcp cool call requests, an mcp tool call response is required.
requestMode: 'mcp'
edgeRequestMode: 'mcp'
httpResponse?: never
mcpToolCallResponse: McpToolCallResponse
}
@ -105,7 +105,7 @@ export function recordToolCallUsage({
tool?.name ?? null,
// Whether this request was made via MCP or HTTP
requestMode,
edgeRequestMode,
// IP address or session ID
ip ?? sessionId,

Wyświetl plik

@ -0,0 +1,91 @@
import type { AdminDeployment, PricingPlan } from '@agentic/platform-types'
import { assert } from '@agentic/platform-core'
import { parseToolIdentifier } from '@agentic/platform-validators'
import type {
AdminConsumer,
GatewayHonoContext,
ResolvedEdgeRequest
} from './types'
import { getAdminConsumer } from './get-admin-consumer'
import { getAdminDeployment } from './get-admin-deployment'
/**
* Resolves an input HTTP request to a specific deployment.
*/
export async function resolveEdgeRequest(
ctx: GatewayHonoContext
): Promise<ResolvedEdgeRequest> {
const requestUrl = new URL(ctx.req.url)
const { pathname } = requestUrl
const requestedToolIdentifier = pathname.replace(/^\//, '').replace(/\/$/, '')
const parsedToolIdentifier = parseToolIdentifier(requestedToolIdentifier)
const deployment = await getAdminDeployment(
ctx,
parsedToolIdentifier.deploymentIdentifier
)
return {
parsedToolIdentifier,
deployment,
requestId: ctx.get('requestId'),
ip: ctx.get('ip')
}
}
/**
* Resolves a consumer and pricing plan for an edge request.
*/
export async function resolveConsumerForEdgeRequest(
ctx: GatewayHonoContext,
{
deployment,
apiKey
}: {
deployment: AdminDeployment
apiKey?: string
}
): Promise<{
consumer?: AdminConsumer
pricingPlan?: PricingPlan
}> {
let pricingPlan: PricingPlan | undefined
let consumer: AdminConsumer | undefined
if (apiKey) {
consumer = await getAdminConsumer(ctx, apiKey)
assert(consumer, 401, `Invalid API key "${apiKey}"`)
assert(
consumer.isStripeSubscriptionActive,
402,
`API key "${apiKey}" does not have an active subscription`
)
assert(
consumer.projectId === deployment.projectId,
403,
`API key "${apiKey}" is not authorized for project "${deployment.projectId}"`
)
// TODO: Ensure that consumer.plan is compatible with the target deployment?
// TODO: This could definitely cause issues when changing pricing plans.
pricingPlan = deployment.pricingPlans.find(
(pricingPlan) => consumer!.plan === pricingPlan.slug
)
// assert(
// pricingPlan,
// 403,
// `Auth token "${token}" unable to find matching pricing plan for project "${deployment.project}"`
// )
} else {
// For unauthenticated requests, default to a free pricing plan if available.
pricingPlan = deployment.pricingPlans.find((plan) => plan.slug === 'free')
}
return {
consumer,
pricingPlan
}
}

Wyświetl plik

@ -1,37 +1,22 @@
import type {
AdminDeployment,
PricingPlan,
Tool
} from '@agentic/platform-types'
import { assert } from '@agentic/platform-core'
import { parseToolIdentifier } from '@agentic/platform-validators'
import type { AdminConsumer, GatewayHonoContext, ToolCallArgs } from './types'
import { getAdminConsumer } from './get-admin-consumer'
import { getAdminDeployment } from './get-admin-deployment'
import type {
GatewayHonoContext,
ResolvedEdgeRequest,
ResolvedHttpEdgeRequest
} from './types'
import { getTool } from './get-tool'
import { getToolArgsFromRequest } from './get-tool-args-from-request'
import { resolveConsumerForEdgeRequest } from './resolve-edge-request'
import { isRequestPubliclyCacheable } from './utils'
export type ResolvedHttpEdgeRequest = {
deployment: AdminDeployment
consumer?: AdminConsumer
pricingPlan?: PricingPlan
tool: Tool
toolCallArgs: ToolCallArgs
cacheControl?: string
}
/**
* Resolves an input HTTP request to a specific deployment, tool call, and
* billing subscription.
*
* Also ensures that the request is valid, enforces rate limits, and adds proxy-
* specific headers to the origin request.
* Resolves an input HTTP request to a specific deployment, tool call, consumer,
* and pricing plan.
*/
export async function resolveHttpEdgeRequest(
ctx: GatewayHonoContext
ctx: GatewayHonoContext,
resolvedEdgeRequest: ResolvedEdgeRequest
): Promise<ResolvedHttpEdgeRequest> {
const logger = ctx.get('logger')
const ip = ctx.get('ip')
@ -40,15 +25,9 @@ export async function resolveHttpEdgeRequest(
? ctx.req.header('cache-control')
: 'no-store'
const { deployment, parsedToolIdentifier } = resolvedEdgeRequest
const { toolName } = parsedToolIdentifier
const { method } = ctx.req
const requestUrl = new URL(ctx.req.url)
const { pathname } = requestUrl
const requestedToolIdentifier = pathname.replace(/^\//, '').replace(/\/$/, '')
const { toolName, deploymentIdentifier } = parseToolIdentifier(
requestedToolIdentifier
)
const deployment = await getAdminDeployment(ctx, deploymentIdentifier)
const tool = getTool({
method,
@ -58,66 +37,36 @@ export async function resolveHttpEdgeRequest(
logger.debug('request', {
method,
pathname,
deploymentIdentifier: deployment.identifier,
toolName,
tool
})
let pricingPlan: PricingPlan | undefined
let consumer: AdminConsumer | undefined
const token = (ctx.req.header('authorization') || '')
const apiKey = (ctx.req.header('authorization') || '')
.replace(/^Bearer /i, '')
.trim()
if (token) {
consumer = await getAdminConsumer(ctx, token)
assert(consumer, 401, `Invalid auth token "${token}"`)
assert(
consumer.isStripeSubscriptionActive,
402,
`Auth token "${token}" does not have an active subscription`
)
assert(
consumer.projectId === deployment.projectId,
403,
`Auth token "${token}" is not authorized for project "${deployment.projectId}"`
)
// TODO: Ensure that consumer.plan is compatible with the target deployment?
// TODO: This could definitely cause issues when changing pricing plans.
pricingPlan = deployment.pricingPlans.find(
(pricingPlan) => consumer!.plan === pricingPlan.slug
)
// assert(
// pricingPlan,
// 403,
// `Auth token "${token}" unable to find matching pricing plan for project "${deployment.project}"`
// )
const { consumer, pricingPlan } = await resolveConsumerForEdgeRequest(ctx, {
deployment,
apiKey
})
if (consumer) {
if (!ctx.get('sessionId')) {
ctx.set('sessionId', `${consumer.id}:${deployment.id}`)
}
} else {
// For unauthenticated requests, default to a free pricing plan if available.
pricingPlan = deployment.pricingPlans.find((plan) => plan.slug === 'free')
if (!ctx.get('sessionId')) {
assert(ip, 500, 'IP address is required for unauthenticated requests')
ctx.set('sessionId', `${ip}:${deployment.projectId}`)
}
}
assert(ctx.get('sessionId'), 500, 'Internal error: sessionId should be set')
// Parse tool call args from the request body.
// Parse tool call arguments from the request body.
const toolCallArgs = await getToolArgsFromRequest(ctx, { tool, deployment })
return {
deployment,
...resolvedEdgeRequest,
consumer,
pricingPlan,
tool,

Wyświetl plik

@ -1,71 +1,27 @@
import type { AdminDeployment, PricingPlan } from '@agentic/platform-types'
import { assert } from '@agentic/platform-core'
import { parseToolIdentifier } from '@agentic/platform-validators'
import type { AdminConsumer, GatewayHonoContext } from './types'
import { getAdminConsumer } from './get-admin-consumer'
import { getAdminDeployment } from './get-admin-deployment'
export type ResolvedMcpEdgeRequest = {
deployment: AdminDeployment
consumer?: AdminConsumer
pricingPlan?: PricingPlan
ip?: string
}
import type {
GatewayHonoContext,
ResolvedEdgeRequest,
ResolvedMcpEdgeRequest
} from './types'
import { resolveConsumerForEdgeRequest } from './resolve-edge-request'
export async function resolveMcpEdgeRequest(
ctx: GatewayHonoContext
ctx: GatewayHonoContext,
resolvedEdgeRequest: ResolvedEdgeRequest
): Promise<ResolvedMcpEdgeRequest> {
const requestUrl = new URL(ctx.req.url)
const { pathname } = requestUrl
const requestedDeploymentIdentifier = pathname
.replace(/^\//, '')
.replace(/\/$/, '')
const { deploymentIdentifier } = parseToolIdentifier(
requestedDeploymentIdentifier
)
const deployment = await getAdminDeployment(ctx, deploymentIdentifier)
const { deployment } = resolvedEdgeRequest
// TODO: Should MCP edge requests also support Authorization header?
const apiKey = ctx.req.query('apiKey')?.trim()
let consumer: AdminConsumer | undefined
let pricingPlan: PricingPlan | undefined
if (apiKey) {
consumer = await getAdminConsumer(ctx, apiKey)
assert(consumer, 401, `Invalid api key "${apiKey}"`)
assert(
consumer.isStripeSubscriptionActive,
402,
`API key "${apiKey}" subscription is not active`
)
assert(
consumer.projectId === deployment.projectId,
403,
`API key "${apiKey}" is not authorized for project "${deployment.projectId}"`
)
// TODO: Ensure that consumer.plan is compatible with the target deployment?
// TODO: This could definitely cause issues when changing pricing plans.
pricingPlan = deployment.pricingPlans.find(
(pricingPlan) => consumer!.plan === pricingPlan.slug
)
// assert(
// pricingPlan,
// 403,
// `API key "${apiKey}" unable to find matching pricing plan for project "${deployment.project}"`
// )
} else {
// For unauthenticated requests, default to a free pricing plan if available.
pricingPlan = deployment.pricingPlans.find((plan) => plan.slug === 'free')
}
const { consumer, pricingPlan } = await resolveConsumerForEdgeRequest(ctx, {
deployment,
apiKey
})
return {
deployment,
...resolvedEdgeRequest,
consumer,
pricingPlan,
ip: ctx.get('ip')
pricingPlan
}
}

Wyświetl plik

@ -123,7 +123,7 @@ export async function resolveOriginToolCall({
const pricingPlanToolOverride = pricingPlan
? toolConfig.pricingPlanOverridesMap?.[pricingPlan.slug]
: undefined
const isToolConfigEnabled = toolConfig.enabled ?? true
const isToolEnabled = toolConfig.enabled ?? true
// Check if this tool is configured for pricing-plan-specific overrides
// which take precedence over the tool's default behavior.
@ -131,11 +131,11 @@ export async function resolveOriginToolCall({
if (pricingPlanToolOverride.enabled !== undefined) {
assert(
pricingPlanToolOverride.enabled,
isToolConfigEnabled ? 403 : 404,
isToolEnabled ? 403 : 404,
`Tool "${tool.name}" is disabled for pricing plan "${pricingPlan.slug}"`
)
} else {
assert(isToolConfigEnabled, 404, `Tool "${tool.name}" is disabled`)
assert(isToolEnabled, 404, `Tool "${tool.name}" is disabled`)
}
if (pricingPlanToolOverride.reportUsage !== undefined) {
@ -146,7 +146,7 @@ export async function resolveOriginToolCall({
rateLimit = pricingPlanToolOverride.rateLimit
}
} else {
assert(isToolConfigEnabled, 404, `Tool "${tool.name}" is disabled`)
assert(isToolEnabled, 404, `Tool "${tool.name}" is disabled`)
}
} else {
if (cacheControl) {

Wyświetl plik

@ -6,10 +6,14 @@ import type {
} from '@agentic/platform-hono'
import type {
AdminConsumer as AdminConsumerImpl,
AdminDeployment,
PricingPlan,
RateLimit,
Tool,
ToolConfig,
User
} from '@agentic/platform-types'
import type { ParsedToolIdentifier } from '@agentic/platform-validators'
import type { Client as McpClient } from '@modelcontextprotocol/sdk/client/index.js'
import type { Context } from 'hono'
import type { Simplify } from 'type-fest'
@ -57,10 +61,30 @@ export type RateLimitState = {
export type RateLimitCache = Map<string, RateLimitState>
export type CacheStatus = 'HIT' | 'MISS' | 'BYPASS' | 'DYNAMIC'
export type RequestMode = 'mcp' | 'http'
export type EdgeRequestMode = 'mcp' | 'http'
export type WaitUntil = (promise: Promise<any>) => void
export interface ResolvedEdgeRequest extends Record<string, unknown> {
parsedToolIdentifier: ParsedToolIdentifier
deployment: AdminDeployment
requestId: string
ip?: string
}
export interface ResolvedMcpEdgeRequest extends ResolvedEdgeRequest {
consumer?: AdminConsumer
pricingPlan?: PricingPlan
}
export interface ResolvedHttpEdgeRequest extends ResolvedEdgeRequest {
consumer?: AdminConsumer
pricingPlan?: PricingPlan
tool: Tool
toolCallArgs: ToolCallArgs
cacheControl?: string
}
export type ResolvedOriginToolCallResult = {
toolCallArgs: ToolCallArgs
originRequest?: Request

Wyświetl plik

@ -463,6 +463,9 @@ importers:
'@agentic/platform-validators':
specifier: workspace:*
version: link:../../packages/validators
'@cloudflare/workers-oauth-provider':
specifier: ^0.0.5
version: 0.0.5
'@hono/zod-validator':
specifier: 'catalog:'
version: 0.7.0(hono@4.7.11)(zod@3.25.62)
@ -969,6 +972,9 @@ packages:
cpu: [x64]
os: [win32]
'@cloudflare/workers-oauth-provider@0.0.5':
resolution: {integrity: sha512-t1x5KAzsubCvb4APnJ93z407X1x7SGj/ga5ziRnwIb/iLy4PMkT/hgd1y5z7Bbsdy5Fy6mywhCP4lym24bX66w==}
'@cloudflare/workers-types@4.20250610.0':
resolution: {integrity: sha512-HxnUoey3QxCEfy07pUm7J42jBi9YPHq/hA3fw6JmOqYLHdviHI28OA8lup+2RUaHwDzh6q1DSfrBvvDqde645A==}
@ -5986,6 +5992,10 @@ snapshots:
'@cloudflare/workerd-windows-64@1.20250604.0':
optional: true
'@cloudflare/workers-oauth-provider@0.0.5':
dependencies:
'@cloudflare/workers-types': 4.20250610.0
'@cloudflare/workers-types@4.20250610.0': {}
'@commander-js/extra-typings@14.0.0(commander@14.0.0)':