From 00a435159773c4d5e4007fe158b33015c9190c5f Mon Sep 17 00:00:00 2001 From: Max Parke Date: Mon, 29 Jun 2026 15:36:15 -0400 Subject: [PATCH] test(tutorials): fix flaky streaming test that broke on first terminal event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_send_event_and_stream_with_reasoning broke out of the stream loop on the first `done` event. A single turn emits several messages — user echo, reasoning, agent text — each ending in a `full` or `done`. When a reasoning message's terminal event arrived before the agent's text `done`, the loop exited with agent_response_found still False, failing at the assertion ("Agent response not found in stream") rather than timing out. The failure signature confirms this: an AssertionError (loop broke early), not a TimeoutError (latency). Consume terminal events until both the user echo and the agent's text reply are seen, keying off message content rather than the first terminal signal. Test-side only: the producer (streaming.py) correctly emits a terminal event per message and is keyed by message id by real consumers; it was just repaired in #449, so the fix stays in the test. Co-Authored-By: Claude Opus 4.8 --- .../010_agent_chat/tests/test_agent.py | 52 +++++++------------ 1 file changed, 19 insertions(+), 33 deletions(-) diff --git a/examples/tutorials/10_async/10_temporal/010_agent_chat/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/010_agent_chat/tests/test_agent.py index acbb88754..f8c3e1aa8 100644 --- a/examples/tutorials/10_async/10_temporal/010_agent_chat/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/010_agent_chat/tests/test_agent.py @@ -240,42 +240,28 @@ async def stream_messages() -> None: task_id=task.id, timeout=90, # Increased timeout for CI environments ): + # A turn emits several messages (user echo, reasoning, agent text), + # each ending in "full" or "done"; consume until the text reply lands. msg_type = event.get("type") if msg_type == "full": - task_message_update = StreamTaskMessageFull.model_validate(event) - if task_message_update.parent_task_message and task_message_update.parent_task_message.id: - finished_message = await client.messages.retrieve(task_message_update.parent_task_message.id) - if ( - finished_message.content - and finished_message.content.type == "text" - and finished_message.content.author == "user" - ): - user_message_found = True - elif ( - finished_message.content - and finished_message.content.type == "text" - and finished_message.content.author == "agent" - ): - agent_response_found = True - elif finished_message.content and finished_message.content.type == "reasoning": - reasoning_found = True - - # Exit early if we have what we need - if user_message_found and agent_response_found: - break - + parent_task_message = StreamTaskMessageFull.model_validate(event).parent_task_message elif msg_type == "done": - task_message_update_done = StreamTaskMessageDone.model_validate(event) - if task_message_update_done.parent_task_message and task_message_update_done.parent_task_message.id: - finished_message = await client.messages.retrieve(task_message_update_done.parent_task_message.id) - if finished_message.content and finished_message.content.type == "reasoning": - reasoning_found = True - elif ( - finished_message.content - and finished_message.content.type == "text" - and finished_message.content.author == "agent" - ): - agent_response_found = True + parent_task_message = StreamTaskMessageDone.model_validate(event).parent_task_message + else: + continue + + if parent_task_message and parent_task_message.id: + finished_message = await client.messages.retrieve(parent_task_message.id) + content = finished_message.content + if content and content.type == "text" and content.author == "user": + user_message_found = True + elif content and content.type == "text" and content.author == "agent": + agent_response_found = True + elif content and content.type == "reasoning": + reasoning_found = True + + # Stop once both the user echo and the agent's text reply are seen. + if user_message_found and agent_response_found: break stream_task = asyncio.create_task(stream_messages())