Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import {
GEN_AI_REQUEST_DIMENSIONS_ATTRIBUTE,
GEN_AI_REQUEST_ENCODING_FORMAT_ATTRIBUTE,
GEN_AI_REQUEST_MODEL_ATTRIBUTE,
GEN_AI_REQUEST_STREAM_ATTRIBUTE,
GEN_AI_REQUEST_TEMPERATURE_ATTRIBUTE,
GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE,
GEN_AI_RESPONSE_ID_ATTRIBUTE,
GEN_AI_RESPONSE_MODEL_ATTRIBUTE,
GEN_AI_RESPONSE_STREAMING_ATTRIBUTE,
GEN_AI_RESPONSE_TEXT_ATTRIBUTE,
GEN_AI_SYSTEM_ATTRIBUTE,
GEN_AI_SYSTEM_INSTRUCTIONS_ATTRIBUTE,
Expand Down Expand Up @@ -40,7 +42,7 @@ describe('OpenAI integration (orchestrion)', () => {
const chatSpans = container.items.filter(
span => span.attributes[SEMANTIC_ATTRIBUTE_SENTRY_OP]?.value === 'gen_ai.chat',
);
expect(chatSpans).toHaveLength(3);
expect(chatSpans).toHaveLength(6);

const chatSpan = container.items.find(
span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'chatcmpl-mock123',
Expand Down Expand Up @@ -84,7 +86,10 @@ describe('OpenAI integration (orchestrion)', () => {
expect(chatSpan!.attributes[GEN_AI_INPUT_MESSAGES_ATTRIBUTE]).toBeUndefined();
expect(chatSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toBeUndefined();

const errorSpan = container.items.find(span => span.name === 'chat error-model');
const errorSpan = container.items.find(
span =>
span.name === 'chat error-model' && span.attributes[GEN_AI_REQUEST_STREAM_ATTRIBUTE] === undefined,
);
expect(errorSpan).toBeDefined();
// `bindTracingChannelToSpan` sets the error status message to the OpenAI error's message,
// so the serialized status is that message rather than the literal 'error' — just assert
Expand Down Expand Up @@ -139,6 +144,69 @@ describe('OpenAI integration (orchestrion)', () => {
type: 'integer',
value: 13,
});

const streamingChatSpan = container.items.find(
span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'chatcmpl-stream-123',
);
expect(streamingChatSpan).toBeDefined();
expect(streamingChatSpan!.status).toBe('ok');
expect(streamingChatSpan!.attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]).toEqual({
type: 'string',
value: ORCHESTRION_ORIGIN,
});
expect(streamingChatSpan!.attributes[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]).toEqual({
type: 'boolean',
value: true,
});
expect(streamingChatSpan!.attributes[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]).toEqual({
type: 'string',
value: '["stop"]',
});
expect(streamingChatSpan!.attributes[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]).toEqual({
type: 'integer',
value: 12,
});
expect(streamingChatSpan!.attributes[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]).toEqual({
type: 'integer',
value: 18,
});
expect(streamingChatSpan!.attributes[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]).toEqual({
type: 'integer',
value: 30,
});

const streamingResponsesSpan = container.items.find(
span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'resp_stream_456',
);
expect(streamingResponsesSpan).toBeDefined();
expect(streamingResponsesSpan!.status).toBe('ok');
expect(streamingResponsesSpan!.attributes[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]).toEqual({
type: 'boolean',
value: true,
});
expect(streamingResponsesSpan!.attributes[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]).toEqual({
type: 'string',
value: '["in_progress","completed"]',
});
expect(streamingResponsesSpan!.attributes[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]).toEqual({
type: 'integer',
value: 6,
});
expect(streamingResponsesSpan!.attributes[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]).toEqual({
type: 'integer',
value: 10,
});
expect(streamingResponsesSpan!.attributes[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]).toEqual({
type: 'integer',
value: 16,
});

const streamingErrorSpan = container.items.find(
span =>
span.name === 'chat error-model' && span.attributes[GEN_AI_REQUEST_STREAM_ATTRIBUTE]?.value === true,
);
expect(streamingErrorSpan).toBeDefined();
expect(streamingErrorSpan!.status).not.toBe('ok');
},
})
.start()
Expand Down Expand Up @@ -199,6 +267,24 @@ describe('OpenAI integration (orchestrion)', () => {
type: 'string',
value: 'Response to: Translate this to French: Hello',
});

const streamingChatSpan = container.items.find(
span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'chatcmpl-stream-123',
);
expect(streamingChatSpan).toBeDefined();
expect(streamingChatSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toEqual({
type: 'string',
value: 'Hello from OpenAI streaming!',
});

const streamingResponsesSpan = container.items.find(
span => span.attributes[GEN_AI_RESPONSE_ID_ATTRIBUTE]?.value === 'resp_stream_456',
);
expect(streamingResponsesSpan).toBeDefined();
expect(streamingResponsesSpan!.attributes[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]).toEqual({
type: 'string',
value: 'Streaming response to: Test streaming responses APITest streaming responses API',
});
},
})
.start()
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/shared-exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ export { _INTERNAL_getSpanContextForToolCallId, _INTERNAL_cleanupToolCallSpanCon
export { toolCallSpanContextMap as _INTERNAL_toolCallSpanContextMap } from './tracing/vercel-ai/constants';
export { instrumentOpenAiClient, extractRequestAttributes, addRequestAttributes } from './tracing/openai';
export { addResponseAttributes, extractRequestParameters } from './tracing/openai/utils';
export { instrumentStream } from './tracing/openai/streaming';
export { OPENAI_INTEGRATION_NAME } from './tracing/openai/constants';
export { instrumentAnthropicAiClient } from './tracing/anthropic-ai';
export { ANTHROPIC_AI_INTEGRATION_NAME } from './tracing/anthropic-ai/constants';
Expand Down
45 changes: 35 additions & 10 deletions packages/server-utils/src/integrations/tracing-channel/openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
debug,
defineIntegration,
extractRequestAttributes,
instrumentStream,
resolveAIRecordingOptions,
SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN,
shouldEnableTruncation,
Expand Down Expand Up @@ -64,10 +65,10 @@ const _openaiChannelIntegration = ((options: OpenAiOptions = {}) => {
data => createGenAiSpan(data, operation, options),
{
beforeSpanEnd: (span, data) => {
if ('result' in data) {
addResponseAttributes(span, data.result, resolveAIRecordingOptions(options).recordOutputs);
}
addResponseAttributes(span, data.result, resolveAIRecordingOptions(options).recordOutputs);
},
// Streaming: the result is a `Stream` consumed later, so instrument it and let it end the span.
deferSpanEnd: ({ span, data }) => wrapStreamResult(span, data, options),
captureError: () => ({ mechanism: { type: ORIGIN, handled: false } }),
},
);
Expand All @@ -85,11 +86,6 @@ function createGenAiSpan(data: OpenAiChatChannelContext, operation: string, opti
const args = data.arguments ?? [];
const params = args[0] as Record<string, unknown> | undefined;

// streaming is not supported
if (params?.stream === true) {
return undefined;
}

const { recordInputs } = resolveAIRecordingOptions(options);
const enableTruncation = shouldEnableTruncation(options.enableTruncation);

Expand All @@ -110,9 +106,38 @@ function createGenAiSpan(data: OpenAiChatChannelContext, operation: string, opti
return span;
}

type AsyncIterableStream = { [Symbol.asyncIterator]: () => AsyncIterator<unknown> };

function isAsyncIterable(value: unknown): value is AsyncIterableStream {
return !!value && typeof (value as AsyncIterableStream)[Symbol.asyncIterator] === 'function';
}

/**
* For a streaming `create({ stream: true })` the result is a `Stream` the caller consumes later. We can't
* swap what `create` returns, but the `Stream` in `data.result` is the same instance the caller holds and
* `asyncEnd` fires before the caller iterates — so we patch its async iterator in place to run through
* `instrumentStream`, which accumulates the streamed attributes and ends the span when iteration finishes.
* Only a streaming call resolves to an async-iterable, so that check alone distinguishes it. Returns `true`
* to hand span-ending ownership to `instrumentStream`; `false` for non-streaming/errored results, which end
* via the normal `beforeSpanEnd` path.
*/
function wrapStreamResult(span: Span, data: OpenAiChatChannelContext, options: OpenAiOptions): boolean {
const result = data.result;
if (!isAsyncIterable(result)) {
return false;
}

const { recordOutputs } = resolveAIRecordingOptions(options);
const iterate = result[Symbol.asyncIterator].bind(result);
const instrumented = instrumentStream({ [Symbol.asyncIterator]: iterate }, span, recordOutputs ?? false);
result[Symbol.asyncIterator] = () => instrumented;

return true;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open streaming spans never end

Medium Severity

When wrapStreamResult defers span completion for stream: true, the span is only finished inside instrumentStream's finally block after async iteration. If the caller resolves create() but never drives the returned Stream (no for await, abandoned reference, or fire-and-forget), deferSpanEnd skips the normal span.end() path and the gen_ai span can remain open indefinitely.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 115b544. Configure here.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream errors leave span ok

Medium Severity

When deferSpanEnd hands off a streaming result, wrapStreamResult never uses the provided end callback. If async iteration fails (network abort, parse error, etc.), instrumentStream still runs endStreamSpan in its finally block and ends the span without error status, and the integration’s captureError / error annotation path is skipped—unlike rejected create() calls or the mysql orchestrion deferSpanEnd pattern.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 9ca6a54. Configure here.

}

/**
* EXPERIMENTAL — orchestrion-driven OpenAI integration. Subscribes to the `orchestrion:openai:*`
* diagnostics_channels injected into `openai`'s `create` methods, so it requires the orchestrion runtime
* hook or bundler plugin. Covers non-streaming `chat.completions.create` and `responses.create`.
* diagnostics_channels injected into `openai`'s `create` methods (chat completions, responses, embeddings,
* conversations), so it requires the orchestrion runtime hook or bundler plugin.
*/
export const openaiChannelIntegration = defineIntegration(_openaiChannelIntegration);
Loading