From b21fbd060413e3e6b5b8a8e23c77929ed3763430 Mon Sep 17 00:00:00 2001 From: Nicolas Hrubec Date: Wed, 1 Jul 2026 18:46:31 +0200 Subject: [PATCH] feat(server-utils): Migrate Anthropic integration to orchestrion Adds an orchestrion based Anthropic integration to server-utils, covering all the APIs (messages, completions, models, beta messages) and both streaming and non-streaming mode. Leaves the core integration intact and only exports the necessary utils that are needed for the orchestrion integration. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../instrument-orchestrion-with-pii.mjs | 18 ++ .../orchestrion/instrument-orchestrion.mjs | 18 ++ .../tracing/anthropic/orchestrion/test.ts | 230 ++++++++++++++++++ packages/core/src/shared-exports.ts | 11 +- .../core/src/tracing/anthropic-ai/index.ts | 14 +- ...erimentalUseDiagnosticsChannelInjection.ts | 7 +- .../integrations/tracing-channel/anthropic.ts | 168 +++++++++++++ .../server-utils/src/orchestrion/channels.ts | 3 + .../server-utils/src/orchestrion/config.ts | 30 +++ .../server-utils/src/orchestrion/index.ts | 1 + 10 files changed, 494 insertions(+), 6 deletions(-) create mode 100644 dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/instrument-orchestrion-with-pii.mjs create mode 100644 dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/instrument-orchestrion.mjs create mode 100644 dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/test.ts create mode 100644 packages/server-utils/src/integrations/tracing-channel/anthropic.ts diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/instrument-orchestrion-with-pii.mjs b/dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/instrument-orchestrion-with-pii.mjs new file mode 100644 index 000000000000..c3ee807ae28c --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/instrument-orchestrion-with-pii.mjs @@ -0,0 +1,18 @@ +import * as Sentry from '@sentry/node'; +import { loggingTransport } from '@sentry-internal/node-integration-tests'; + +Sentry.experimentalUseDiagnosticsChannelInjection(); + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + release: '1.0', + tracesSampleRate: 1.0, + dataCollection: { genAI: { inputs: true, outputs: true } }, + transport: loggingTransport, + beforeSendTransaction: event => { + if (event.transaction.includes('/anthropic/v1/')) { + return null; + } + return event; + }, +}); diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/instrument-orchestrion.mjs b/dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/instrument-orchestrion.mjs new file mode 100644 index 000000000000..f277c578a62a --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/instrument-orchestrion.mjs @@ -0,0 +1,18 @@ +import * as Sentry from '@sentry/node'; +import { loggingTransport } from '@sentry-internal/node-integration-tests'; + +Sentry.experimentalUseDiagnosticsChannelInjection(); + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + release: '1.0', + tracesSampleRate: 1.0, + dataCollection: { genAI: { inputs: false, outputs: false } }, + transport: loggingTransport, + beforeSendTransaction: event => { + if (event.transaction.includes('/anthropic/v1/')) { + return null; + } + return event; + }, +}); diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/test.ts b/dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/test.ts new file mode 100644 index 000000000000..cc76711e55a6 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/orchestrion/test.ts @@ -0,0 +1,230 @@ +import { SEMANTIC_ATTRIBUTE_SENTRY_OP, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '@sentry/core'; +import { afterAll, describe, expect } from 'vitest'; +import { + GEN_AI_INPUT_MESSAGES_ATTRIBUTE, + GEN_AI_OPERATION_NAME_ATTRIBUTE, + GEN_AI_REQUEST_MODEL_ATTRIBUTE, + GEN_AI_REQUEST_STREAM_ATTRIBUTE, + GEN_AI_RESPONSE_ID_ATTRIBUTE, + GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, + GEN_AI_RESPONSE_TEXT_ATTRIBUTE, + GEN_AI_SYSTEM_ATTRIBUTE, + GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE, + GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, + GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, +} from '../../../../../../packages/core/src/tracing/ai/gen-ai-attributes'; +import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../../utils/runner'; + +// The origin distinguishes the orchestrion (diagnostics-channel) path from the +// OTel/proxy one (`auto.ai.anthropic`). +const ORCHESTRION_ORIGIN = 'auto.ai.orchestrion.anthropic'; + +describe('Anthropic integration (orchestrion)', () => { + afterAll(() => { + cleanupChildProcesses(); + }); + + createEsmAndCjsTests(__dirname, '../scenario.mjs', 'instrument-orchestrion.mjs', (createRunner, test) => { + test('creates anthropic spans via the diagnostics-channel path', async () => { + await createRunner() + .ignore('event') + .expect({ transaction: { transaction: 'main' } }) + .expect({ + span: container => { + const completionSpan = container.items.find( + span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'msg_mock123', + ); + expect(completionSpan).toBeDefined(); + expect(completionSpan!.name).toBe('chat claude-3-haiku-20240307'); + expect(completionSpan!.status).toBe('ok'); + expect(completionSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]).toEqual({ + type: 'string', + value: ORCHESTRION_ORIGIN, + }); + expect(completionSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_OP]).toEqual({ + type: 'string', + value: 'gen_ai.chat', + }); + expect(completionSpan!.attributes[GEN_AI_OPERATION_NAME_ATTRIBUTE]).toEqual({ + type: 'string', + value: 'chat', + }); + expect(completionSpan!.attributes[GEN_AI_SYSTEM_ATTRIBUTE]).toEqual({ type: 'string', value: 'anthropic' }); + expect(completionSpan!.attributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE]).toEqual({ + type: 'string', + value: 'claude-3-haiku-20240307', + }); + expect(completionSpan!.attributes[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 10, + }); + expect(completionSpan!.attributes[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 15, + }); + expect(completionSpan!.attributes[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 25, + }); + // Recording disabled: no inputs/outputs captured. + expect(completionSpan!.attributes[GEN_AI_INPUT_MESSAGES_ATTRIBUTE]).toBeUndefined(); + expect(completionSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toBeUndefined(); + + const errorSpan = container.items.find(span => span.name === 'chat error-model'); + expect(errorSpan).toBeDefined(); + expect(errorSpan!.status).not.toBe('ok'); + expect(errorSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]).toEqual({ + type: 'string', + value: ORCHESTRION_ORIGIN, + }); + + const tokenCountingSpan = container.items.find( + span => + span.attributes[SEMANTIC_ATTRIBUTE_SENTRY_OP]?.value === 'gen_ai.chat' && + span.status === 'ok' && + span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE] === undefined, + ); + expect(tokenCountingSpan).toBeDefined(); + expect(tokenCountingSpan!.name).toBe('chat claude-3-haiku-20240307'); + + const modelsSpan = container.items.find(span => span.name === 'models claude-3-haiku-20240307'); + expect(modelsSpan).toBeDefined(); + expect(modelsSpan!.status).toBe('ok'); + expect(modelsSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_OP]).toEqual({ + type: 'string', + value: 'gen_ai.models', + }); + expect(modelsSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]).toEqual({ + type: 'string', + value: ORCHESTRION_ORIGIN, + }); + + // messages.create({ stream: true }) — the async-iterable `Stream` (`stream: true` in the request). + const streamingCreateSpan = container.items.find( + span => + span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'msg_stream123' && + span.attributes[GEN_AI_REQUEST_STREAM_ATTRIBUTE]?.value === true, + ); + expect(streamingCreateSpan).toBeDefined(); + expect(streamingCreateSpan!.status).toBe('ok'); + expect(streamingCreateSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]).toEqual({ + type: 'string', + value: ORCHESTRION_ORIGIN, + }); + expect(streamingCreateSpan!.attributes[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]).toEqual({ + type: 'boolean', + value: true, + }); + expect(streamingCreateSpan!.attributes[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 10, + }); + expect(streamingCreateSpan!.attributes[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 15, + }); + expect(streamingCreateSpan!.attributes[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 25, + }); + }, + }) + .start() + .completed(); + }); + }); + + createEsmAndCjsTests(__dirname, '../scenario.mjs', 'instrument-orchestrion-with-pii.mjs', (createRunner, test) => { + test('records inputs and outputs when PII is enabled', async () => { + await createRunner() + .ignore('event') + .expect({ transaction: { transaction: 'main' } }) + .expect({ + span: container => { + const completionSpan = container.items.find( + span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'msg_mock123', + ); + expect(completionSpan).toBeDefined(); + expect(completionSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]).toEqual({ + type: 'string', + value: ORCHESTRION_ORIGIN, + }); + expect(completionSpan!.attributes[GEN_AI_INPUT_MESSAGES_ATTRIBUTE]).toEqual({ + type: 'string', + value: '[{"role":"user","content":"What is the capital of France?"}]', + }); + expect(completionSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toEqual({ + type: 'string', + value: 'Hello from Anthropic mock!', + }); + + const streamingCreateSpan = container.items.find( + span => + span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'msg_stream123' && + span.attributes[GEN_AI_REQUEST_STREAM_ATTRIBUTE]?.value === true, + ); + expect(streamingCreateSpan).toBeDefined(); + expect(streamingCreateSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toEqual({ + type: 'string', + value: 'Hello from stream!', + }); + }, + }) + .start() + .completed(); + }); + }); + + createEsmAndCjsTests( + __dirname, + '../scenario-stream.mjs', + 'instrument-orchestrion-with-pii.mjs', + (createRunner, test) => { + test('creates a span for the messages.stream() emitter path', async () => { + await createRunner() + .ignore('event') + .expect({ transaction: { transaction: 'main' } }) + .expect({ + span: container => { + // The emitter span from `stream()` itself carries no `stream` request param, unlike + // `messages.create({ stream: true })`. + const messageStreamSpan = container.items.find( + span => + span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'msg_stream_1' && + span.attributes[GEN_AI_REQUEST_STREAM_ATTRIBUTE] === undefined, + ); + expect(messageStreamSpan).toBeDefined(); + expect(messageStreamSpan!.name).toBe('chat claude-3-haiku-20240307'); + expect(messageStreamSpan!.status).toBe('ok'); + expect(messageStreamSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]).toEqual({ + type: 'string', + value: ORCHESTRION_ORIGIN, + }); + expect(messageStreamSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_OP]).toEqual({ + type: 'string', + value: 'gen_ai.chat', + }); + expect(messageStreamSpan!.attributes[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]).toEqual({ + type: 'boolean', + value: true, + }); + expect(messageStreamSpan!.attributes[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 10, + }); + expect(messageStreamSpan!.attributes[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 15, + }); + expect(messageStreamSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toEqual({ + type: 'string', + value: 'Hello from stream!', + }); + }, + }) + .start() + .completed(); + }); + }, + ); +}); diff --git a/packages/core/src/shared-exports.ts b/packages/core/src/shared-exports.ts index d22d01c2ede7..d8fa7f9a482c 100644 --- a/packages/core/src/shared-exports.ts +++ b/packages/core/src/shared-exports.ts @@ -177,16 +177,23 @@ export * as metrics from './metrics/public-api'; export type { MetricOptions } from './metrics/public-api'; export { createConsolaReporter } from './integrations/consola'; export { addVercelAiProcessors, getProviderMetadataAttributes } from './tracing/vercel-ai'; -export { getTruncatedJsonString, shouldEnableTruncation } from './tracing/ai/utils'; +export { getTruncatedJsonString, shouldEnableTruncation, resolveAIRecordingOptions } from './tracing/ai/utils'; export { GEN_AI_INPUT_MESSAGES_ORIGINAL_LENGTH_ATTRIBUTE, + GEN_AI_REQUEST_MODEL_ATTRIBUTE, GEN_AI_SYSTEM_INSTRUCTIONS_ATTRIBUTE, } from './tracing/ai/gen-ai-attributes'; export { _INTERNAL_getSpanContextForToolCallId, _INTERNAL_cleanupToolCallSpanContext } from './tracing/vercel-ai/utils'; export { toolCallSpanContextMap as _INTERNAL_toolCallSpanContextMap } from './tracing/vercel-ai/constants'; export { instrumentOpenAiClient } from './tracing/openai'; export { OPENAI_INTEGRATION_NAME } from './tracing/openai/constants'; -export { instrumentAnthropicAiClient } from './tracing/anthropic-ai'; +export { + instrumentAnthropicAiClient, + extractRequestAttributes as extractAnthropicRequestAttributes, + addPrivateRequestAttributes as addAnthropicRequestAttributes, + addResponseAttributes as addAnthropicResponseAttributes, +} from './tracing/anthropic-ai'; +export { instrumentAsyncIterableStream, instrumentMessageStream } from './tracing/anthropic-ai/streaming'; export { ANTHROPIC_AI_INTEGRATION_NAME } from './tracing/anthropic-ai/constants'; export { instrumentGoogleGenAIClient } from './tracing/google-genai'; export { GOOGLE_GENAI_INTEGRATION_NAME } from './tracing/google-genai/constants'; diff --git a/packages/core/src/tracing/anthropic-ai/index.ts b/packages/core/src/tracing/anthropic-ai/index.ts index d42adf6b4f38..4238bcb870db 100644 --- a/packages/core/src/tracing/anthropic-ai/index.ts +++ b/packages/core/src/tracing/anthropic-ai/index.ts @@ -36,7 +36,11 @@ import { handleResponseError, messagesFromParams, setMessagesAttribute } from '. /** * Extract request attributes from method arguments */ -function extractRequestAttributes(args: unknown[], methodPath: string, operationName: string): Record { +export function extractRequestAttributes( + args: unknown[], + methodPath: string, + operationName: string, +): Record { const attributes: Record = { [GEN_AI_SYSTEM_ATTRIBUTE]: 'anthropic', [GEN_AI_OPERATION_NAME_ATTRIBUTE]: operationName, @@ -73,7 +77,11 @@ function extractRequestAttributes(args: unknown[], methodPath: string, operation * Add private request attributes to spans. * This is only recorded if recordInputs is true. */ -function addPrivateRequestAttributes(span: Span, params: Record, enableTruncation: boolean): void { +export function addPrivateRequestAttributes( + span: Span, + params: Record, + enableTruncation: boolean, +): void { const messages = messagesFromParams(params); setMessagesAttribute(span, messages, enableTruncation); @@ -143,7 +151,7 @@ function addMetadataAttributes(span: Span, response: AnthropicAiResponse): void /** * Add response attributes to spans */ -function addResponseAttributes(span: Span, response: AnthropicAiResponse, recordOutputs?: boolean): void { +export function addResponseAttributes(span: Span, response: AnthropicAiResponse, recordOutputs?: boolean): void { if (!response || typeof response !== 'object') return; // capture error, do not add attributes if error (they shouldn't exist) diff --git a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts index 6bba862cdfc0..81aa4aa433be 100644 --- a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts +++ b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts @@ -1,6 +1,7 @@ import { mysqlChannelIntegration, lruMemoizerChannelIntegration, + anthropicChannelIntegration, detectOrchestrionSetup, } from '@sentry/server-utils/orchestrion'; import { registerDiagnosticsChannelInjection } from '@sentry/server-utils/orchestrion/register'; @@ -41,7 +42,11 @@ import { setDiagnosticsChannelInjectionLoader } from './diagnosticsChannelInject */ export function experimentalUseDiagnosticsChannelInjection(): void { setDiagnosticsChannelInjectionLoader((): DiagnosticsChannelInjection => { - const integrations = [mysqlChannelIntegration(), lruMemoizerChannelIntegration()] as const; + const integrations = [ + mysqlChannelIntegration(), + lruMemoizerChannelIntegration(), + anthropicChannelIntegration(), + ] as const; const replacedOtelIntegrationNames = integrations.map(i => i.name); return { diff --git a/packages/server-utils/src/integrations/tracing-channel/anthropic.ts b/packages/server-utils/src/integrations/tracing-channel/anthropic.ts new file mode 100644 index 000000000000..7f9df0899322 --- /dev/null +++ b/packages/server-utils/src/integrations/tracing-channel/anthropic.ts @@ -0,0 +1,168 @@ +import * as diagnosticsChannel from 'node:diagnostics_channel'; +import type { AnthropicAiOptions, AnthropicAiResponse, IntegrationFn, Span, SpanAttributeValue } from '@sentry/core'; +import { + addAnthropicRequestAttributes, + addAnthropicResponseAttributes, + debug, + defineIntegration, + extractAnthropicRequestAttributes, + GEN_AI_REQUEST_MODEL_ATTRIBUTE, + instrumentAsyncIterableStream, + instrumentMessageStream, + resolveAIRecordingOptions, + SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, + shouldEnableTruncation, + startInactiveSpan, + waitForTracingChannelBinding, +} from '@sentry/core'; +import { DEBUG_BUILD } from '../../debug-build'; +import { CHANNELS } from '../../orchestrion/channels'; +import { bindTracingChannelToSpan } from '../../tracing-channel'; + +// Same name as the OTel integration by design: when enabled, the OTel 'Anthropic_AI' +// integration is dropped from the default set (see the Node opt-in loader). +const INTEGRATION_NAME = 'Anthropic_AI' as const; + +// Distinct from the proxy's `auto.ai.anthropic` so spans from the orchestrion path +// are attributable separately from the OTel/proxy one. +const ORIGIN = 'auto.ai.orchestrion.anthropic'; + +// `stream` determines how the span is ended +const INSTRUMENTED_CHANNELS = [ + { channel: CHANNELS.ANTHROPIC_CHAT, operation: 'chat', methodPath: 'messages.create', stream: 'async-iterable' }, + { channel: CHANNELS.ANTHROPIC_MODELS, operation: 'models', methodPath: 'models.retrieve', stream: 'none' }, + { + channel: CHANNELS.ANTHROPIC_MESSAGES_STREAM, + operation: 'chat', + methodPath: 'messages.stream', + stream: 'message-stream', + }, +] as const; + +type StreamMode = (typeof INSTRUMENTED_CHANNELS)[number]['stream']; + +interface AnthropicChannelContext { + arguments: unknown[]; + result?: unknown; +} + +let subscribed = false; + +const _anthropicChannelIntegration = ((options: AnthropicAiOptions = {}) => { + return { + name: INTEGRATION_NAME, + setupOnce() { + // tracingChannel is unavailable before Node 18.19 and prevent double-subscribe + if (!diagnosticsChannel.tracingChannel || subscribed) { + return; + } + subscribed = true; + + // `bindTracingChannelToSpan` needs the async-context binding that `initOpenTelemetry()` registers + // after `setupOnce` runs, so wait for it before subscribing. + waitForTracingChannelBinding(() => { + for (const { channel, operation, methodPath, stream } of INSTRUMENTED_CHANNELS) { + DEBUG_BUILD && debug.log(`[orchestrion:anthropic] subscribing to channel "${channel}"`); + bindTracingChannelToSpan( + diagnosticsChannel.tracingChannel(channel), + data => createGenAiSpan(data, operation, methodPath, options), + { + beforeSpanEnd: (span, data) => { + addAnthropicResponseAttributes( + span, + data.result as AnthropicAiResponse, + resolveAIRecordingOptions(options).recordOutputs, + ); + }, + deferSpanEnd: ({ span, data }) => wrapStreamResult(span, data, stream, options), + captureError: () => ({ mechanism: { type: ORIGIN, handled: false } }), + }, + ); + } + }); + }, + }; +}) satisfies IntegrationFn; + +/** + * Build the span for an instrumented call. + * Returning `undefined` opts the payload out so no span is opened. + */ +function createGenAiSpan( + data: AnthropicChannelContext, + operation: string, + methodPath: string, + options: AnthropicAiOptions, +): Span | undefined { + const args = data.arguments ?? []; + const params = typeof args[0] === 'object' && args[0] !== null ? (args[0] as Record) : undefined; + + const { recordInputs } = resolveAIRecordingOptions(options); + const enableTruncation = shouldEnableTruncation(options.enableTruncation); + + const attributes = extractAnthropicRequestAttributes(args, methodPath, operation); + const model = (attributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown'; + attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN] = ORIGIN; + + const span = startInactiveSpan({ + name: `${operation} ${model}`, + op: `gen_ai.${operation}`, + attributes: attributes as Record, + }); + + if (recordInputs && params) { + addAnthropicRequestAttributes(span, params, enableTruncation); + } + + return span; +} + +type AsyncIterableStream = { [Symbol.asyncIterator]: () => AsyncIterator }; +type MessageStreamEmitter = { on: (...args: unknown[]) => void }; + +function isAsyncIterable(value: unknown): value is AsyncIterableStream { + return !!value && typeof (value as AsyncIterableStream)[Symbol.asyncIterator] === 'function'; +} + +function isMessageStream(value: unknown): value is MessageStreamEmitter { + return !!value && typeof (value as MessageStreamEmitter).on === 'function'; +} + +/** + * Hand span-ending ownership to a streamed result: returns `true` to skip the normal `beforeSpanEnd`, + * `false` for non-streaming results (which end via `beforeSpanEnd`). + * + * - `async-iterable`: patch the `Stream`'s async iterator in place so `instrumentAsyncIterableStream` ends + * the span when iteration finishes. + * - `message-stream`: `instrumentMessageStream` attaches `'message'`/`'error'` listeners that end the span. + */ +function wrapStreamResult( + span: Span, + data: AnthropicChannelContext, + stream: StreamMode, + options: AnthropicAiOptions, +): boolean { + const { recordOutputs } = resolveAIRecordingOptions(options); + const result = data.result; + + if (stream === 'async-iterable' && isAsyncIterable(result)) { + const iterate = result[Symbol.asyncIterator].bind(result); + const instrumented = instrumentAsyncIterableStream({ [Symbol.asyncIterator]: iterate }, span, recordOutputs); + result[Symbol.asyncIterator] = () => instrumented; + return true; + } + + if (stream === 'message-stream' && isMessageStream(result)) { + instrumentMessageStream(result, span, recordOutputs); + return true; + } + + return false; +} + +/** + * EXPERIMENTAL — orchestrion-driven Anthropic integration. Subscribes to the `orchestrion:@anthropic-ai/sdk:*` + * diagnostics_channels injected into the SDK's chat (`messages`/`completions`/beta `messages`), `models`, and + * `messages.stream()` methods, so it requires the orchestrion runtime hook or bundler plugin. + */ +export const anthropicChannelIntegration = defineIntegration(_anthropicChannelIntegration); diff --git a/packages/server-utils/src/orchestrion/channels.ts b/packages/server-utils/src/orchestrion/channels.ts index ad2d8ccdd4dd..bed3bffc479c 100644 --- a/packages/server-utils/src/orchestrion/channels.ts +++ b/packages/server-utils/src/orchestrion/channels.ts @@ -14,6 +14,9 @@ export const CHANNELS = { MYSQL_QUERY: 'orchestrion:mysql:query', LRU_MEMOIZER_LOAD: 'orchestrion:lru-memoizer:load', + ANTHROPIC_CHAT: 'orchestrion:@anthropic-ai/sdk:chat', + ANTHROPIC_MODELS: 'orchestrion:@anthropic-ai/sdk:models', + ANTHROPIC_MESSAGES_STREAM: 'orchestrion:@anthropic-ai/sdk:messages-stream', } as const; export type ChannelName = (typeof CHANNELS)[keyof typeof CHANNELS]; diff --git a/packages/server-utils/src/orchestrion/config.ts b/packages/server-utils/src/orchestrion/config.ts index 104df2185386..d56a4060739b 100644 --- a/packages/server-utils/src/orchestrion/config.ts +++ b/packages/server-utils/src/orchestrion/config.ts @@ -38,6 +38,36 @@ export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [ module: { name: 'lru-memoizer', versionRange: '>=2.1.0 <4', filePath: 'lib/async.js' }, functionQuery: { functionName: 'memoizedFunction', kind: 'Callback' }, }, + // One entry each for CJS/ESM + ...(['resources/messages/messages.js', 'resources/messages/messages.mjs'].flatMap(filePath => + (['create', 'countTokens'] as const).map(methodName => ({ + channelName: 'chat', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Messages', methodName, kind: 'Auto' as const }, + })), + ) satisfies InstrumentationConfig[]), + ...(['resources/completions.js', 'resources/completions.mjs'].map(filePath => ({ + channelName: 'chat', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Completions', methodName: 'create', kind: 'Auto' as const }, + })) satisfies InstrumentationConfig[]), + ...(['resources/beta/messages/messages.js', 'resources/beta/messages/messages.mjs'].map(filePath => ({ + channelName: 'chat', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Messages', methodName: 'create', kind: 'Auto' as const }, + })) satisfies InstrumentationConfig[]), + ...(['resources/models.js', 'resources/models.mjs'].map(filePath => ({ + channelName: 'models', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Models', methodName: 'retrieve', kind: 'Auto' as const }, + })) satisfies InstrumentationConfig[]), + // `messages.stream()` returns a synchronous emitter, not a promise, so `kind: 'Sync'` is required: + // `Auto`'s promise wrapper never publishes `end` for a non-thenable return, so the span would never end. + ...(['resources/messages/messages.js', 'resources/messages/messages.mjs'].map(filePath => ({ + channelName: 'messages-stream', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Messages', methodName: 'stream', kind: 'Sync' as const }, + })) satisfies InstrumentationConfig[]), ]; /** diff --git a/packages/server-utils/src/orchestrion/index.ts b/packages/server-utils/src/orchestrion/index.ts index 4b182e51ec13..c2143fd24fb0 100644 --- a/packages/server-utils/src/orchestrion/index.ts +++ b/packages/server-utils/src/orchestrion/index.ts @@ -1,3 +1,4 @@ export { detectOrchestrionSetup } from './detect'; export { mysqlChannelIntegration } from '../integrations/tracing-channel/mysql'; export { lruMemoizerChannelIntegration } from '../integrations/tracing-channel/lru-memoizer'; +export { anthropicChannelIntegration } from '../integrations/tracing-channel/anthropic';