diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/orchestrion/test.ts b/dev-packages/node-integration-tests/suites/tracing/openai/orchestrion/test.ts index b01d5191da38..5c58b5b86bca 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/orchestrion/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/openai/orchestrion/test.ts @@ -8,10 +8,12 @@ import { GEN_AI_REQUEST_DIMENSIONS_ATTRIBUTE, GEN_AI_REQUEST_ENCODING_FORMAT_ATTRIBUTE, GEN_AI_REQUEST_MODEL_ATTRIBUTE, + GEN_AI_REQUEST_STREAM_ATTRIBUTE, GEN_AI_REQUEST_TEMPERATURE_ATTRIBUTE, GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, GEN_AI_RESPONSE_ID_ATTRIBUTE, GEN_AI_RESPONSE_MODEL_ATTRIBUTE, + GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, GEN_AI_RESPONSE_TEXT_ATTRIBUTE, GEN_AI_SYSTEM_ATTRIBUTE, GEN_AI_SYSTEM_INSTRUCTIONS_ATTRIBUTE, @@ -40,7 +42,7 @@ describe('OpenAI integration (orchestrion)', () => { const chatSpans = container.items.filter( span => span.attributes[SEMANTIC_ATTRIBUTE_SENTRY_OP]?.value === 'gen_ai.chat', ); - expect(chatSpans).toHaveLength(3); + expect(chatSpans).toHaveLength(6); const chatSpan = container.items.find( span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'chatcmpl-mock123', @@ -84,7 +86,10 @@ describe('OpenAI integration (orchestrion)', () => { expect(chatSpan!.attributes[GEN_AI_INPUT_MESSAGES_ATTRIBUTE]).toBeUndefined(); expect(chatSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toBeUndefined(); - const errorSpan = container.items.find(span => span.name === 'chat error-model'); + const errorSpan = container.items.find( + span => + span.name === 'chat error-model' && span.attributes[GEN_AI_REQUEST_STREAM_ATTRIBUTE] === undefined, + ); expect(errorSpan).toBeDefined(); // `bindTracingChannelToSpan` sets the error status message to the OpenAI error's message, // so the serialized status is that message rather than the literal 'error' — just assert @@ -139,6 +144,69 @@ describe('OpenAI integration (orchestrion)', () => { type: 'integer', value: 13, }); + + const streamingChatSpan = container.items.find( + span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'chatcmpl-stream-123', + ); + expect(streamingChatSpan).toBeDefined(); + expect(streamingChatSpan!.status).toBe('ok'); + expect(streamingChatSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]).toEqual({ + type: 'string', + value: ORCHESTRION_ORIGIN, + }); + expect(streamingChatSpan!.attributes[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]).toEqual({ + type: 'boolean', + value: true, + }); + expect(streamingChatSpan!.attributes[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]).toEqual({ + type: 'string', + value: '["stop"]', + }); + expect(streamingChatSpan!.attributes[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 12, + }); + expect(streamingChatSpan!.attributes[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 18, + }); + expect(streamingChatSpan!.attributes[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 30, + }); + + const streamingResponsesSpan = container.items.find( + span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'resp_stream_456', + ); + expect(streamingResponsesSpan).toBeDefined(); + expect(streamingResponsesSpan!.status).toBe('ok'); + expect(streamingResponsesSpan!.attributes[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]).toEqual({ + type: 'boolean', + value: true, + }); + expect(streamingResponsesSpan!.attributes[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]).toEqual({ + type: 'string', + value: '["in_progress","completed"]', + }); + expect(streamingResponsesSpan!.attributes[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 6, + }); + expect(streamingResponsesSpan!.attributes[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 10, + }); + expect(streamingResponsesSpan!.attributes[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]).toEqual({ + type: 'integer', + value: 16, + }); + + const streamingErrorSpan = container.items.find( + span => + span.name === 'chat error-model' && span.attributes[GEN_AI_REQUEST_STREAM_ATTRIBUTE]?.value === true, + ); + expect(streamingErrorSpan).toBeDefined(); + expect(streamingErrorSpan!.status).not.toBe('ok'); }, }) .start() @@ -199,6 +267,24 @@ describe('OpenAI integration (orchestrion)', () => { type: 'string', value: 'Response to: Translate this to French: Hello', }); + + const streamingChatSpan = container.items.find( + span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'chatcmpl-stream-123', + ); + expect(streamingChatSpan).toBeDefined(); + expect(streamingChatSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toEqual({ + type: 'string', + value: 'Hello from OpenAI streaming!', + }); + + const streamingResponsesSpan = container.items.find( + span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'resp_stream_456', + ); + expect(streamingResponsesSpan).toBeDefined(); + expect(streamingResponsesSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toEqual({ + type: 'string', + value: 'Streaming response to: Test streaming responses APITest streaming responses API', + }); }, }) .start() diff --git a/packages/core/src/shared-exports.ts b/packages/core/src/shared-exports.ts index 515f13ad9ae4..88a0ef82b7e8 100644 --- a/packages/core/src/shared-exports.ts +++ b/packages/core/src/shared-exports.ts @@ -186,6 +186,7 @@ export { _INTERNAL_getSpanContextForToolCallId, _INTERNAL_cleanupToolCallSpanCon export { toolCallSpanContextMap as _INTERNAL_toolCallSpanContextMap } from './tracing/vercel-ai/constants'; export { instrumentOpenAiClient, extractRequestAttributes, addRequestAttributes } from './tracing/openai'; export { addResponseAttributes, extractRequestParameters } from './tracing/openai/utils'; +export { instrumentStream } from './tracing/openai/streaming'; export { OPENAI_INTEGRATION_NAME } from './tracing/openai/constants'; export { instrumentAnthropicAiClient } from './tracing/anthropic-ai'; export { ANTHROPIC_AI_INTEGRATION_NAME } from './tracing/anthropic-ai/constants'; diff --git a/packages/server-utils/src/integrations/tracing-channel/openai.ts b/packages/server-utils/src/integrations/tracing-channel/openai.ts index 9bc7cb916b5c..330863b20a7d 100644 --- a/packages/server-utils/src/integrations/tracing-channel/openai.ts +++ b/packages/server-utils/src/integrations/tracing-channel/openai.ts @@ -6,6 +6,7 @@ import { debug, defineIntegration, extractRequestAttributes, + instrumentStream, resolveAIRecordingOptions, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, shouldEnableTruncation, @@ -64,10 +65,10 @@ const _openaiChannelIntegration = ((options: OpenAiOptions = {}) => { data => createGenAiSpan(data, operation, options), { beforeSpanEnd: (span, data) => { - if ('result' in data) { - addResponseAttributes(span, data.result, resolveAIRecordingOptions(options).recordOutputs); - } + addResponseAttributes(span, data.result, resolveAIRecordingOptions(options).recordOutputs); }, + // Streaming: the result is a `Stream` consumed later, so instrument it and let it end the span. + deferSpanEnd: ({ span, data }) => wrapStreamResult(span, data, options), captureError: () => ({ mechanism: { type: ORIGIN, handled: false } }), }, ); @@ -85,11 +86,6 @@ function createGenAiSpan(data: OpenAiChatChannelContext, operation: string, opti const args = data.arguments ?? []; const params = args[0] as Record | undefined; - // streaming is not supported - if (params?.stream === true) { - return undefined; - } - const { recordInputs } = resolveAIRecordingOptions(options); const enableTruncation = shouldEnableTruncation(options.enableTruncation); @@ -110,9 +106,38 @@ function createGenAiSpan(data: OpenAiChatChannelContext, operation: string, opti return span; } +type AsyncIterableStream = { [Symbol.asyncIterator]: () => AsyncIterator }; + +function isAsyncIterable(value: unknown): value is AsyncIterableStream { + return !!value && typeof (value as AsyncIterableStream)[Symbol.asyncIterator] === 'function'; +} + +/** + * For a streaming `create({ stream: true })` the result is a `Stream` the caller consumes later. We can't + * swap what `create` returns, but the `Stream` in `data.result` is the same instance the caller holds and + * `asyncEnd` fires before the caller iterates — so we patch its async iterator in place to run through + * `instrumentStream`, which accumulates the streamed attributes and ends the span when iteration finishes. + * Only a streaming call resolves to an async-iterable, so that check alone distinguishes it. Returns `true` + * to hand span-ending ownership to `instrumentStream`; `false` for non-streaming/errored results, which end + * via the normal `beforeSpanEnd` path. + */ +function wrapStreamResult(span: Span, data: OpenAiChatChannelContext, options: OpenAiOptions): boolean { + const result = data.result; + if (!isAsyncIterable(result)) { + return false; + } + + const { recordOutputs } = resolveAIRecordingOptions(options); + const iterate = result[Symbol.asyncIterator].bind(result); + const instrumented = instrumentStream({ [Symbol.asyncIterator]: iterate }, span, recordOutputs ?? false); + result[Symbol.asyncIterator] = () => instrumented; + + return true; +} + /** * EXPERIMENTAL — orchestrion-driven OpenAI integration. Subscribes to the `orchestrion:openai:*` - * diagnostics_channels injected into `openai`'s `create` methods, so it requires the orchestrion runtime - * hook or bundler plugin. Covers non-streaming `chat.completions.create` and `responses.create`. + * diagnostics_channels injected into `openai`'s `create` methods (chat completions, responses, embeddings, + * conversations), so it requires the orchestrion runtime hook or bundler plugin. */ export const openaiChannelIntegration = defineIntegration(_openaiChannelIntegration);