diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts b/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts index 6ba0dcc08cb3..3068c7c48a4d 100644 --- a/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts @@ -20,6 +20,7 @@ import { GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, } from '../../../../../packages/core/src/tracing/ai/gen-ai-attributes'; +import { isOrchestrionEnabled } from '../../../utils'; import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner'; describe('Anthropic integration', () => { @@ -168,7 +169,9 @@ describe('Anthropic integration', () => { expect(completionSpan!.attributes[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE].value).toBe(15); expect(completionSpan!.attributes[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE].value).toBe(25); expect(completionSpan!.attributes['sentry.op'].value).toBe('gen_ai.chat'); - expect(completionSpan!.attributes['sentry.origin'].value).toBe('auto.ai.anthropic'); + expect(completionSpan!.attributes['sentry.origin'].value).toBe( + isOrchestrionEnabled() ? 'auto.ai.orchestrion.anthropic' : 'auto.ai.anthropic', + ); const errorSpan = container.items.find( span => diff --git a/dev-packages/node-integration-tests/suites/tracing/langchain/test.ts b/dev-packages/node-integration-tests/suites/tracing/langchain/test.ts index b12e3ba31bc4..0debcae19e1a 100644 --- a/dev-packages/node-integration-tests/suites/tracing/langchain/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/langchain/test.ts @@ -21,6 +21,7 @@ import { GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, } from '../../../../../packages/core/src/tracing/ai/gen-ai-attributes'; +import { isOrchestrionEnabled } from '../../../utils'; import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner'; import { createEsmTests } from '../../../utils/runner/createEsmAndCjsTests'; @@ -255,7 +256,9 @@ describe('LangChain integration', () => { span: container => { expect(container.items).toHaveLength(2); const anthropicSpan = container.items.find( - span => span.attributes['sentry.origin'].value === 'auto.ai.anthropic', + span => + span.attributes['sentry.origin'].value === + (isOrchestrionEnabled() ? 'auto.ai.orchestrion.anthropic' : 'auto.ai.anthropic'), ); expect(anthropicSpan).toBeDefined(); expect(anthropicSpan!.name).toBe('chat claude-3-5-sonnet-20241022'); diff --git a/packages/core/src/shared-exports.ts b/packages/core/src/shared-exports.ts index 76f0c36cc2a9..dd6bdf6c535b 100644 --- a/packages/core/src/shared-exports.ts +++ b/packages/core/src/shared-exports.ts @@ -178,16 +178,23 @@ export * as metrics from './metrics/public-api'; export type { MetricOptions } from './metrics/public-api'; export { createConsolaReporter } from './integrations/consola'; export { addVercelAiProcessors, getProviderMetadataAttributes } from './tracing/vercel-ai'; -export { getTruncatedJsonString, shouldEnableTruncation } from './tracing/ai/utils'; +export { getTruncatedJsonString, shouldEnableTruncation, resolveAIRecordingOptions } from './tracing/ai/utils'; export { GEN_AI_INPUT_MESSAGES_ORIGINAL_LENGTH_ATTRIBUTE, + GEN_AI_REQUEST_MODEL_ATTRIBUTE, GEN_AI_SYSTEM_INSTRUCTIONS_ATTRIBUTE, } from './tracing/ai/gen-ai-attributes'; export { _INTERNAL_getSpanContextForToolCallId, _INTERNAL_cleanupToolCallSpanContext } from './tracing/vercel-ai/utils'; export { toolCallSpanContextMap as _INTERNAL_toolCallSpanContextMap } from './tracing/vercel-ai/constants'; export { instrumentOpenAiClient } from './tracing/openai'; export { OPENAI_INTEGRATION_NAME } from './tracing/openai/constants'; -export { instrumentAnthropicAiClient } from './tracing/anthropic-ai'; +export { + instrumentAnthropicAiClient, + extractRequestAttributes as extractAnthropicRequestAttributes, + addPrivateRequestAttributes as addAnthropicRequestAttributes, + addResponseAttributes as addAnthropicResponseAttributes, +} from './tracing/anthropic-ai'; +export { instrumentAsyncIterableStream, instrumentMessageStream } from './tracing/anthropic-ai/streaming'; export { ANTHROPIC_AI_INTEGRATION_NAME } from './tracing/anthropic-ai/constants'; export { instrumentGoogleGenAIClient } from './tracing/google-genai'; export { GOOGLE_GENAI_INTEGRATION_NAME } from './tracing/google-genai/constants'; diff --git a/packages/core/src/tracing/anthropic-ai/index.ts b/packages/core/src/tracing/anthropic-ai/index.ts index d42adf6b4f38..4238bcb870db 100644 --- a/packages/core/src/tracing/anthropic-ai/index.ts +++ b/packages/core/src/tracing/anthropic-ai/index.ts @@ -36,7 +36,11 @@ import { handleResponseError, messagesFromParams, setMessagesAttribute } from '. /** * Extract request attributes from method arguments */ -function extractRequestAttributes(args: unknown[], methodPath: string, operationName: string): Record { +export function extractRequestAttributes( + args: unknown[], + methodPath: string, + operationName: string, +): Record { const attributes: Record = { [GEN_AI_SYSTEM_ATTRIBUTE]: 'anthropic', [GEN_AI_OPERATION_NAME_ATTRIBUTE]: operationName, @@ -73,7 +77,11 @@ function extractRequestAttributes(args: unknown[], methodPath: string, operation * Add private request attributes to spans. * This is only recorded if recordInputs is true. */ -function addPrivateRequestAttributes(span: Span, params: Record, enableTruncation: boolean): void { +export function addPrivateRequestAttributes( + span: Span, + params: Record, + enableTruncation: boolean, +): void { const messages = messagesFromParams(params); setMessagesAttribute(span, messages, enableTruncation); @@ -143,7 +151,7 @@ function addMetadataAttributes(span: Span, response: AnthropicAiResponse): void /** * Add response attributes to spans */ -function addResponseAttributes(span: Span, response: AnthropicAiResponse, recordOutputs?: boolean): void { +export function addResponseAttributes(span: Span, response: AnthropicAiResponse, recordOutputs?: boolean): void { if (!response || typeof response !== 'object') return; // capture error, do not add attributes if error (they shouldn't exist) diff --git a/packages/server-utils/src/integrations/tracing-channel/anthropic.ts b/packages/server-utils/src/integrations/tracing-channel/anthropic.ts new file mode 100644 index 000000000000..a366d1aa232e --- /dev/null +++ b/packages/server-utils/src/integrations/tracing-channel/anthropic.ts @@ -0,0 +1,184 @@ +import * as diagnosticsChannel from 'node:diagnostics_channel'; +import type { AnthropicAiOptions, AnthropicAiResponse, IntegrationFn, Span, SpanAttributeValue } from '@sentry/core'; +import { + _INTERNAL_shouldSkipAiProviderWrapping, + addAnthropicRequestAttributes, + addAnthropicResponseAttributes, + debug, + defineIntegration, + extractAnthropicRequestAttributes, + GEN_AI_REQUEST_MODEL_ATTRIBUTE, + instrumentAsyncIterableStream, + instrumentMessageStream, + resolveAIRecordingOptions, + SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, + shouldEnableTruncation, + startInactiveSpan, + waitForTracingChannelBinding, +} from '@sentry/core'; +import { DEBUG_BUILD } from '../../debug-build'; +import { CHANNELS } from '../../orchestrion/channels'; +import { bindTracingChannelToSpan } from '../../tracing-channel'; + +// Same name as the OTel integration by design: when enabled, the OTel 'Anthropic_AI' +// integration is dropped from the default set (see the Node opt-in loader). +const INTEGRATION_NAME = 'Anthropic_AI' as const; + +// Distinct from the proxy's `auto.ai.anthropic` so spans from the orchestrion path +// are attributable separately from the OTel/proxy one. +const ORIGIN = 'auto.ai.orchestrion.anthropic'; + +// `stream` determines how the span is ended +const INSTRUMENTED_CHANNELS = [ + { channel: CHANNELS.ANTHROPIC_CHAT, operation: 'chat', methodPath: 'messages.create', stream: 'async-iterable' }, + { channel: CHANNELS.ANTHROPIC_MODELS, operation: 'models', methodPath: 'models.retrieve', stream: 'none' }, + { + channel: CHANNELS.ANTHROPIC_MESSAGES_STREAM, + operation: 'chat', + methodPath: 'messages.stream', + stream: 'message-stream', + }, +] as const; + +type StreamMode = (typeof INSTRUMENTED_CHANNELS)[number]['stream']; + +interface AnthropicChannelContext { + arguments: unknown[]; + result?: unknown; +} + +let subscribed = false; + +const _anthropicChannelIntegration = ((options: AnthropicAiOptions = {}) => { + return { + name: INTEGRATION_NAME, + setupOnce() { + // tracingChannel is unavailable before Node 18.19 and prevent double-subscribe + if (!diagnosticsChannel.tracingChannel || subscribed) { + return; + } + subscribed = true; + + // `bindTracingChannelToSpan` needs the async-context binding that `initOpenTelemetry()` registers + // after `setupOnce` runs, so wait for it before subscribing. + waitForTracingChannelBinding(() => { + for (const { channel, operation, methodPath, stream } of INSTRUMENTED_CHANNELS) { + DEBUG_BUILD && debug.log(`[orchestrion:anthropic] subscribing to channel "${channel}"`); + bindTracingChannelToSpan( + diagnosticsChannel.tracingChannel(channel), + data => createGenAiSpan(data, operation, methodPath, options), + { + beforeSpanEnd: (span, data) => { + addAnthropicResponseAttributes( + span, + data.result as AnthropicAiResponse, + resolveAIRecordingOptions(options).recordOutputs, + ); + }, + deferSpanEnd: ({ span, data }) => wrapStreamResult(span, data, stream, options), + captureError: () => ({ mechanism: { type: ORIGIN, handled: false } }), + }, + ); + } + }); + }, + }; +}) satisfies IntegrationFn; + +/** + * Build the span for an instrumented call. + * Returning `undefined` opts the payload out so no span is opened. + */ +function createGenAiSpan( + data: AnthropicChannelContext, + operation: string, + methodPath: string, + options: AnthropicAiOptions, +): Span | undefined { + const args = data.arguments ?? []; + + // When LangChain (or another provider) is driving the SDK, it records the spans itself and marks this + // provider as skipped — mirror the OTel integration and don't double-instrument. + if (_INTERNAL_shouldSkipAiProviderWrapping(INTEGRATION_NAME)) { + return undefined; + } + + // `messages.stream()` internally calls the instrumented `messages.create({ stream: true })` tagged with + // an `X-Stainless-Helper-Method: 'stream'` header. The messages-stream channel already covers it, so skip + // the nested create to avoid a duplicate span. + const requestOptions = args[1] as { headers?: Record } | undefined; + if (requestOptions?.headers?.['X-Stainless-Helper-Method'] === 'stream') { + return undefined; + } + + const params = typeof args[0] === 'object' && args[0] !== null ? (args[0] as Record) : undefined; + + const { recordInputs } = resolveAIRecordingOptions(options); + const enableTruncation = shouldEnableTruncation(options.enableTruncation); + + const attributes = extractAnthropicRequestAttributes(args, methodPath, operation); + const model = (attributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown'; + attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN] = ORIGIN; + + const span = startInactiveSpan({ + name: `${operation} ${model}`, + op: `gen_ai.${operation}`, + attributes: attributes as Record, + }); + + if (recordInputs && params) { + addAnthropicRequestAttributes(span, params, enableTruncation); + } + + return span; +} + +type AsyncIterableStream = { [Symbol.asyncIterator]: () => AsyncIterator }; +type MessageStreamEmitter = { on: (...args: unknown[]) => void }; + +function isAsyncIterable(value: unknown): value is AsyncIterableStream { + return !!value && typeof (value as AsyncIterableStream)[Symbol.asyncIterator] === 'function'; +} + +function isMessageStream(value: unknown): value is MessageStreamEmitter { + return !!value && typeof (value as MessageStreamEmitter).on === 'function'; +} + +/** + * Hand span-ending ownership to a streamed result: returns `true` to skip the normal `beforeSpanEnd`, + * `false` for non-streaming results (which end via `beforeSpanEnd`). + * + * - `async-iterable`: patch the `Stream`'s async iterator in place so `instrumentAsyncIterableStream` ends + * the span when iteration finishes. + * - `message-stream`: `instrumentMessageStream` attaches `'message'`/`'error'` listeners that end the span. + */ +function wrapStreamResult( + span: Span, + data: AnthropicChannelContext, + stream: StreamMode, + options: AnthropicAiOptions, +): boolean { + const { recordOutputs } = resolveAIRecordingOptions(options); + const result = data.result; + + if (stream === 'async-iterable' && isAsyncIterable(result)) { + const iterate = result[Symbol.asyncIterator].bind(result); + const instrumented = instrumentAsyncIterableStream({ [Symbol.asyncIterator]: iterate }, span, recordOutputs); + result[Symbol.asyncIterator] = () => instrumented; + return true; + } + + if (stream === 'message-stream' && isMessageStream(result)) { + instrumentMessageStream(result, span, recordOutputs); + return true; + } + + return false; +} + +/** + * EXPERIMENTAL — orchestrion-driven Anthropic integration. Subscribes to the `orchestrion:@anthropic-ai/sdk:*` + * diagnostics_channels injected into the SDK's chat (`messages`/`completions`/beta `messages`), `models`, and + * `messages.stream()` methods, so it requires the orchestrion runtime hook or bundler plugin. + */ +export const anthropicChannelIntegration = defineIntegration(_anthropicChannelIntegration); diff --git a/packages/server-utils/src/orchestrion/channels.ts b/packages/server-utils/src/orchestrion/channels.ts index 71cba4d6e8da..f1fbf6e10b9a 100644 --- a/packages/server-utils/src/orchestrion/channels.ts +++ b/packages/server-utils/src/orchestrion/channels.ts @@ -17,6 +17,9 @@ export const CHANNELS = { PG_QUERY: 'orchestrion:pg:query', PG_CONNECT: 'orchestrion:pg:connect', PGPOOL_CONNECT: 'orchestrion:pg-pool:connect', + ANTHROPIC_CHAT: 'orchestrion:@anthropic-ai/sdk:chat', + ANTHROPIC_MODELS: 'orchestrion:@anthropic-ai/sdk:models', + ANTHROPIC_MESSAGES_STREAM: 'orchestrion:@anthropic-ai/sdk:messages-stream', } as const; export type ChannelName = (typeof CHANNELS)[keyof typeof CHANNELS]; diff --git a/packages/server-utils/src/orchestrion/config.ts b/packages/server-utils/src/orchestrion/config.ts index a41e255772c7..80ad107fc126 100644 --- a/packages/server-utils/src/orchestrion/config.ts +++ b/packages/server-utils/src/orchestrion/config.ts @@ -76,6 +76,36 @@ export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [ module: { name: 'pg-pool', versionRange: '>=2.0.0 <4', filePath: 'index.js' }, functionQuery: { className: 'Pool', methodName: 'connect', kind: 'Auto' }, }, + // One entry each for CJS/ESM + ...(['resources/messages/messages.js', 'resources/messages/messages.mjs'].flatMap(filePath => + (['create', 'countTokens'] as const).map(methodName => ({ + channelName: 'chat', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Messages', methodName, kind: 'Auto' as const }, + })), + ) satisfies InstrumentationConfig[]), + ...(['resources/completions.js', 'resources/completions.mjs'].map(filePath => ({ + channelName: 'chat', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Completions', methodName: 'create', kind: 'Auto' as const }, + })) satisfies InstrumentationConfig[]), + ...(['resources/beta/messages/messages.js', 'resources/beta/messages/messages.mjs'].map(filePath => ({ + channelName: 'chat', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Messages', methodName: 'create', kind: 'Auto' as const }, + })) satisfies InstrumentationConfig[]), + ...(['resources/models.js', 'resources/models.mjs'].map(filePath => ({ + channelName: 'models', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Models', methodName: 'retrieve', kind: 'Auto' as const }, + })) satisfies InstrumentationConfig[]), + // `messages.stream()` returns a synchronous emitter, not a promise, so `kind: 'Sync'` is required: + // `Auto`'s promise wrapper never publishes `end` for a non-thenable return, so the span would never end. + ...(['resources/messages/messages.js', 'resources/messages/messages.mjs'].map(filePath => ({ + channelName: 'messages-stream', + module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath }, + functionQuery: { className: 'Messages', methodName: 'stream', kind: 'Sync' as const }, + })) satisfies InstrumentationConfig[]), ]; /** diff --git a/packages/server-utils/src/orchestrion/index.ts b/packages/server-utils/src/orchestrion/index.ts index ecde9e3386ac..b38b6e0890a1 100644 --- a/packages/server-utils/src/orchestrion/index.ts +++ b/packages/server-utils/src/orchestrion/index.ts @@ -1,9 +1,15 @@ +import { anthropicChannelIntegration } from '../integrations/tracing-channel/anthropic'; import { lruMemoizerChannelIntegration } from '../integrations/tracing-channel/lru-memoizer'; import { mysqlChannelIntegration } from '../integrations/tracing-channel/mysql'; import { postgresChannelIntegration } from '../integrations/tracing-channel/postgres'; export { detectOrchestrionSetup } from './detect'; -export { lruMemoizerChannelIntegration, mysqlChannelIntegration, postgresChannelIntegration }; +export { + lruMemoizerChannelIntegration, + mysqlChannelIntegration, + postgresChannelIntegration, + anthropicChannelIntegration, +}; /** * The canonical set of orchestrion diagnostics-channel integrations, keyed by their public @@ -18,4 +24,5 @@ export const channelIntegrations = { postgresIntegration: postgresChannelIntegration, mysqlIntegration: mysqlChannelIntegration, lruMemoizerIntegration: lruMemoizerChannelIntegration, + anthropicIntegration: anthropicChannelIntegration, } as const;