Skip to content

Python: Process messages to an executor serially within a superstep#6776

Open
TaoChenOSU wants to merge 6 commits into
mainfrom
feature/python-serial-message-delivery
Open

Python: Process messages to an executor serially within a superstep#6776
TaoChenOSU wants to merge 6 commits into
mainfrom
feature/python-serial-message-delivery

Conversation

@TaoChenOSU

@TaoChenOSU TaoChenOSU commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Motivation & Context

Within a single superstep, the runner drains pending messages grouped by source executor and asyncio.gathers the per-source deliveries concurrently. Neither EdgeRunner.send_message nor Executor.execute held any per-target lock, and Executor.execute's first statement is an await (emitting the executor_invoked event). So when two different upstream executors target the same executor in one superstep, their handler invocations could interleave at await points — i.e. the same executor could process two messages concurrently. That breaks the expectation that an executor processes its inbox serially, and it can corrupt executor-local state (and, for WorkflowExecutor, trip the shared sub-workflow's concurrent-run guard).

Description & Review Guide

  • What are the major changes?
    • Add a per-instance asyncio.Lock to Executor, acquired around the handler invocation in Executor.execute(). Each executor now processes its messages one at a time.
  • What is the impact of these changes?
    • Deliveries to the same executor within a superstep are serialized; messages from the same source remain in order (cross-source order is best-effort, which matches the fact that there is no well-defined global send order across sources within a superstep).
    • Concurrency across distinct executors is preserved (the lock is per-instance, not global), so fan-out parallelism (e.g. concurrent agent calls) is unaffected.
    • No public API change.
  • What do you want reviewers to focus on?
    • The lock placement in Executor.execute and the absence of re-entrancy/deadlock paths (handlers never re-enter their own execute; WorkflowExecutor runs the child on a separate executor set).

Fixes:
This PR is the second PR for breaking down the changes in #6407 into smaller PRs.

Contribution Checklist

  • The code builds clean without any errors or warnings
  • All unit tests pass, and I have added new tests where possible
  • The PR follows the Contribution Guidelines
  • This PR is linked to an issue and there is no other open PR for this issue (see Related Issue above).
  • This is not a breaking change. If it is a breaking change, add the breaking change label (or add "[BREAKING]" to the title prefix, before or after any language prefix) — a workflow keeps the label and title prefix in sync automatically.

Add a per-executor asyncio.Lock in Executor.execute so each executor processes its messages one at a time within a superstep, while preserving concurrency across distinct executors. Includes a regression test.
Copilot AI review requested due to automatic review settings June 26, 2026 19:01
@moonbox3 moonbox3 added the python Usage: [Issues, PRs], Target: Python label Jun 26, 2026

@github-actions github-actions Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review

Reviewers: 5 | Confidence: 94% | Result: All clear

Reviewed: Correctness, Security Reliability, Test Coverage, Failure Modes, Design Approach


Automated review by TaoChenOSU's agents

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a workflow concurrency bug where a single target executor could process multiple inbound messages concurrently within the same superstep, potentially corrupting executor-local state. It does so by introducing per-executor serialization and adding a regression test.

Changes:

  • Add a per-executor lock to serialize Executor.execute() handler invocations.
  • Add a workflow test that reproduces concurrent cross-source delivery to the same target executor and asserts serialization.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
python/packages/core/agent_framework/_workflows/_executor.py Adds per-executor execution serialization to prevent concurrent processing of messages delivered to the same executor within a superstep.
python/packages/core/tests/workflow/test_workflow.py Adds a regression test ensuring concurrent deliveries to the same executor do not overlap.

Comment thread python/packages/core/agent_framework/_workflows/_executor.py
Comment thread python/packages/core/agent_framework/_workflows/_executor.py Outdated
Comment thread python/packages/core/tests/workflow/test_workflow.py Outdated
@github-actions

github-actions Bot commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Python Test Coverage

Python Test Coverage Report •
FileStmtsMissCoverMissing
packages/core/agent_framework/_workflows
   _executor.py1951094%224, 363, 365, 374, 394, 397, 504, 509, 519, 678
   _runner_context.py1671591%66, 80–81, 83–84, 86, 395, 414, 423, 426–428, 469, 482, 486
TOTAL42582508888% 

Python Unit Test Overview

Tests Skipped Failures Errors Time
8342 37 💤 0 ❌ 0 🔥 2m 9s ⏱️

asyncio.Lock created in Executor.__init__ would bind to the first event loop it was awaited under, so reusing an executor/workflow across loops (e.g. successive asyncio.run calls) raised 'bound to a different event loop'. Create the lock lazily via _get_execution_lock(), re-creating it when the running loop changes. Adds a loop-scoped lock test.
Like the per-executor lock, the runner context's asyncio.Queue bound to the first event loop it was awaited under, so reusing a workflow across loops (e.g. successive asyncio.run calls) raised 'bound to a different event loop'. Re-create the queue lazily via _get_event_queue() when the running loop changes. Adds an integration test reusing a workflow across event loops.
Initialize _event_queue to None and create it on first use in _get_event_queue, mirroring the per-executor lock. Avoids constructing a queue in __init__/reset_for_new_run that is immediately discarded once the running loop is known.
@TaoChenOSU TaoChenOSU marked this pull request as ready for review June 26, 2026 22:40
@TaoChenOSU TaoChenOSU self-assigned this Jun 26, 2026
@TaoChenOSU TaoChenOSU added the workflows Usage: [Issues, PRs], Target: Workflows label Jun 26, 2026
@TaoChenOSU TaoChenOSU moved this to In Review in Agent Framework Jun 26, 2026

@github-actions github-actions Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review

Reviewers: 5 | Confidence: 78% | Result: All clear

Reviewed: Correctness, Security Reliability, Test Coverage, Failure Modes, Design Approach


Automated review by TaoChenOSU's agents

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

python Usage: [Issues, PRs], Target: Python workflows Usage: [Issues, PRs], Target: Workflows

Projects

Status: In Review

Development

Successfully merging this pull request may close these issues.

3 participants