-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(node): Add streaming to orchestrion OpenAI integration #21886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ import { | |
| debug, | ||
| defineIntegration, | ||
| extractRequestAttributes, | ||
| instrumentStream, | ||
| resolveAIRecordingOptions, | ||
| SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, | ||
| shouldEnableTruncation, | ||
|
|
@@ -64,10 +65,10 @@ const _openaiChannelIntegration = ((options: OpenAiOptions = {}) => { | |
| data => createGenAiSpan(data, operation, options), | ||
| { | ||
| beforeSpanEnd: (span, data) => { | ||
| if ('result' in data) { | ||
| addResponseAttributes(span, data.result, resolveAIRecordingOptions(options).recordOutputs); | ||
| } | ||
| addResponseAttributes(span, data.result, resolveAIRecordingOptions(options).recordOutputs); | ||
| }, | ||
| // Streaming: the result is a `Stream` consumed later, so instrument it and let it end the span. | ||
| deferSpanEnd: ({ span, data }) => wrapStreamResult(span, data, options), | ||
| captureError: () => ({ mechanism: { type: ORIGIN, handled: false } }), | ||
| }, | ||
| ); | ||
|
|
@@ -85,11 +86,6 @@ function createGenAiSpan(data: OpenAiChatChannelContext, operation: string, opti | |
| const args = data.arguments ?? []; | ||
| const params = args[0] as Record<string, unknown> | undefined; | ||
|
|
||
| // streaming is not supported | ||
| if (params?.stream === true) { | ||
| return undefined; | ||
| } | ||
|
|
||
| const { recordInputs } = resolveAIRecordingOptions(options); | ||
| const enableTruncation = shouldEnableTruncation(options.enableTruncation); | ||
|
|
||
|
|
@@ -110,9 +106,38 @@ function createGenAiSpan(data: OpenAiChatChannelContext, operation: string, opti | |
| return span; | ||
| } | ||
|
|
||
| type AsyncIterableStream = { [Symbol.asyncIterator]: () => AsyncIterator<unknown> }; | ||
|
|
||
| function isAsyncIterable(value: unknown): value is AsyncIterableStream { | ||
| return !!value && typeof (value as AsyncIterableStream)[Symbol.asyncIterator] === 'function'; | ||
| } | ||
|
|
||
| /** | ||
| * For a streaming `create({ stream: true })` the result is a `Stream` the caller consumes later. We can't | ||
| * swap what `create` returns, but the `Stream` in `data.result` is the same instance the caller holds and | ||
| * `asyncEnd` fires before the caller iterates — so we patch its async iterator in place to run through | ||
| * `instrumentStream`, which accumulates the streamed attributes and ends the span when iteration finishes. | ||
| * Only a streaming call resolves to an async-iterable, so that check alone distinguishes it. Returns `true` | ||
| * to hand span-ending ownership to `instrumentStream`; `false` for non-streaming/errored results, which end | ||
| * via the normal `beforeSpanEnd` path. | ||
| */ | ||
| function wrapStreamResult(span: Span, data: OpenAiChatChannelContext, options: OpenAiOptions): boolean { | ||
| const result = data.result; | ||
| if (!isAsyncIterable(result)) { | ||
| return false; | ||
| } | ||
|
|
||
| const { recordOutputs } = resolveAIRecordingOptions(options); | ||
| const iterate = result[Symbol.asyncIterator].bind(result); | ||
| const instrumented = instrumentStream({ [Symbol.asyncIterator]: iterate }, span, recordOutputs ?? false); | ||
| result[Symbol.asyncIterator] = () => instrumented; | ||
|
|
||
| return true; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stream errors leave span okMedium Severity When Reviewed by Cursor Bugbot for commit 9ca6a54. Configure here. |
||
| } | ||
|
|
||
| /** | ||
| * EXPERIMENTAL — orchestrion-driven OpenAI integration. Subscribes to the `orchestrion:openai:*` | ||
| * diagnostics_channels injected into `openai`'s `create` methods, so it requires the orchestrion runtime | ||
| * hook or bundler plugin. Covers non-streaming `chat.completions.create` and `responses.create`. | ||
| * diagnostics_channels injected into `openai`'s `create` methods (chat completions, responses, embeddings, | ||
| * conversations), so it requires the orchestrion runtime hook or bundler plugin. | ||
| */ | ||
| export const openaiChannelIntegration = defineIntegration(_openaiChannelIntegration); | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open streaming spans never end
Medium Severity
When
wrapStreamResultdefers span completion forstream: true, the span is only finished insideinstrumentStream'sfinallyblock after async iteration. If the caller resolvescreate()but never drives the returnedStream(nofor await, abandoned reference, or fire-and-forget),deferSpanEndskips the normalspan.end()path and the gen_ai span can remain open indefinitely.Reviewed by Cursor Bugbot for commit 115b544. Configure here.