Python: Process messages to an executor serially within a superstep#6776
Open
TaoChenOSU wants to merge 6 commits into
Open
Python: Process messages to an executor serially within a superstep#6776TaoChenOSU wants to merge 6 commits into
TaoChenOSU wants to merge 6 commits into
Conversation
Add a per-executor asyncio.Lock in Executor.execute so each executor processes its messages one at a time within a superstep, while preserving concurrency across distinct executors. Includes a regression test.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR addresses a workflow concurrency bug where a single target executor could process multiple inbound messages concurrently within the same superstep, potentially corrupting executor-local state. It does so by introducing per-executor serialization and adding a regression test.
Changes:
- Add a per-executor lock to serialize
Executor.execute()handler invocations. - Add a workflow test that reproduces concurrent cross-source delivery to the same target executor and asserts serialization.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| python/packages/core/agent_framework/_workflows/_executor.py | Adds per-executor execution serialization to prevent concurrent processing of messages delivered to the same executor within a superstep. |
| python/packages/core/tests/workflow/test_workflow.py | Adds a regression test ensuring concurrent deliveries to the same executor do not overlap. |
Contributor
Python Test Coverage Report •
Python Unit Test Overview
|
|||||||||||||||||||||||||||||||||||
asyncio.Lock created in Executor.__init__ would bind to the first event loop it was awaited under, so reusing an executor/workflow across loops (e.g. successive asyncio.run calls) raised 'bound to a different event loop'. Create the lock lazily via _get_execution_lock(), re-creating it when the running loop changes. Adds a loop-scoped lock test.
Like the per-executor lock, the runner context's asyncio.Queue bound to the first event loop it was awaited under, so reusing a workflow across loops (e.g. successive asyncio.run calls) raised 'bound to a different event loop'. Re-create the queue lazily via _get_event_queue() when the running loop changes. Adds an integration test reusing a workflow across event loops.
Initialize _event_queue to None and create it on first use in _get_event_queue, mirroring the per-executor lock. Avoids constructing a queue in __init__/reset_for_new_run that is immediately discarded once the running loop is known.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation & Context
Within a single superstep, the runner drains pending messages grouped by source executor and
asyncio.gathers the per-source deliveries concurrently. NeitherEdgeRunner.send_messagenorExecutor.executeheld any per-target lock, andExecutor.execute's first statement is anawait(emitting theexecutor_invokedevent). So when two different upstream executors target the same executor in one superstep, their handler invocations could interleave atawaitpoints — i.e. the same executor could process two messages concurrently. That breaks the expectation that an executor processes its inbox serially, and it can corrupt executor-local state (and, forWorkflowExecutor, trip the shared sub-workflow's concurrent-run guard).Description & Review Guide
asyncio.LocktoExecutor, acquired around the handler invocation inExecutor.execute(). Each executor now processes its messages one at a time.Executor.executeand the absence of re-entrancy/deadlock paths (handlers never re-enter their ownexecute;WorkflowExecutorruns the child on a separate executor set).Fixes:
This PR is the second PR for breaking down the changes in #6407 into smaller PRs.
Contribution Checklist
breaking changelabel (or add "[BREAKING]" to the title prefix, before or after any language prefix) — a workflow keeps the label and title prefix in sync automatically.