Skip to content

[python][ray] Add no-shuffle bucket join for co-bucketed tables#8397

Merged
JingsongLi merged 17 commits into
apache:masterfrom
XiaoHongbo-Hope:support_bucket_join
Jul 3, 2026
Merged

[python][ray] Add no-shuffle bucket join for co-bucketed tables#8397
JingsongLi merged 17 commits into
apache:masterfrom
XiaoHongbo-Hope:support_bucket_join

Conversation

@XiaoHongbo-Hope

Copy link
Copy Markdown
Contributor

Purpose

Tests

Add pypaimon.ray.bucket_join: a no-shuffle join for two fixed-bucket (HASH_FIXED)
tables that share a bucket count and bucket-key, joined on that key. Each bucket
is read and joined locally in its own Ray task, so equal keys co-locate by bucket
and there is no global shuffle -- the alternative to ray.data.join for a co-bucketed
url->row_id locator table vs a per-task input table (data-evolution main tables are
bucket-unaware, so bucketing is borrowed via the lightweight locator table).

Validates matching bucket count / bucket-key / join-key; rejects otherwise.
…lits

Plan each side's manifest a single time in the driver and group splits by bucket,
then hand each bucket task only its own splits (splits pickle to Ray cleanly),
instead of re-planning the full manifest inside every bucket task. Only buckets
present on both sides are dispatched (inner join).
Cover the remaining reject paths besides bucket-count: different bucket-key, and
a join key that is not the bucket-key.
…r join

Avoid driver OOM at scale: return ray.data.from_arrow_refs(refs) so each bucket's
join output stays a distributed object ref, instead of ray.get-ing every bucket
into the driver and rebuilding with from_arrow (which would pull hundreds of GB
of matched rows onto one node).

Also restrict join_type to 'inner': the per-bucket intersection is only correct
for inner joins (an outer join needs the union of buckets), so reject others with
a clear error instead of silently returning wrong results.
…er worker

- No shared bucket now returns an empty Dataset that keeps the join schema
  (join two empty reads) instead of a schema-less from_items([]).
- Per-process table cache reused across a worker's bucket reads, so the
  catalog is not reloaded per bucket; planning still loads a fresh table.
- Document the no-overlapping-column assumption in the docstring; note that
  planning is driver-side metadata only.
- Tests: fan-out (one url -> many row_ids) and empty-result-keeps-schema.
Bucket ids are per-partition, so grouping splits by bucket alone would
join rows across partitions. Reject partitioned tables up front (before
any read) until (partition, bucket) grouping is implemented.
- Resolve bucket keys via table_schema.bucket_keys instead of the raw
  bucket-key option, so primary-key tables that bucket by their PK (no
  explicit bucket-key) are accepted instead of wrongly rejected.
- Validate up front that each projection keeps the join key and that the
  two sides share no non-key columns, instead of failing deep inside a
  Ray task with an opaque pyarrow error.
- Cover PK default bucket-key, projection-missing-key, and column
  collision.
Add a Bucket Join section to the Ray Data page: usage, parameters,
return type, and the co-bucketing / inner-only / non-partitioned
constraints.
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as ready for review July 2, 2026 17:00
…orker cache

Address review findings:
- Reject mismatched bucket-function.type (default/mod/hive hash the same key
  to different buckets, so co-location is not guaranteed).
- Reject bucket-key columns whose types differ across sides (same name but
  INT vs BIGINT can hash apart and silently drop matches).
- Key the per-worker table cache by schema id and guard it with a lock, so a
  schema change invalidates a stale cached table instead of mis-decoding.
- Docs: note bucket-key type equality, projection-only column disambiguation,
  per-side snapshot, and skew/memory per bucket.
- Tests: per-bucket dispatch (no shuffle), composite key, schema-id cache
  keying, bucket-function and key-type rejects.
A rescaled table (ALTER bucket, not yet rewritten) keeps old files under a
different total_buckets. Splits carry only bucket, so the same bucket id
across two bucket spaces would falsely co-locate and silently drop matches.
Reject at plan time (via manifest entries' total_buckets), mirroring Spark's
consistent-bucket guard.

Also compare bucket-key types ignoring nullability (a present key hashes the
same regardless), so a NOT NULL vs nullable key no longer over-rejects.
…le schema

- Cache miss loaded the latest schema but stored it under the requested
  schema id; now verify the loaded schema id matches and fail fast if the
  schema moved after planning (a stale split plan), instead of reading with
  the wrong schema.
- Pin the rescale guard and the split plan to a single snapshot, so a commit
  landing between the two manifest reads cannot slip stale-bucket files past
  the guard.
- Compare bucket-key types ignoring nullability (already in prior commit).
- Tests updated for the fail-fast cache path and a written rescale table.
Hoist the nested _key_type helper to module level; a nested def with no
preceding blank line tripped E301.

@QuakeWang QuakeWang left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

left_by_bucket = _plan_splits_by_bucket(left, catalog_options, left_projection, lcount)
right_by_bucket = _plan_splits_by_bucket(right, catalog_options, right_projection, rcount)

l_schema_id, r_schema_id = ltable.table_schema.id, rtable.table_schema.id

@JingsongLi JingsongLi Jul 3, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

l_schema_id and r_schema_id come from the tables loaded before _plan_splits_by_bucket, but the split plans are built from fresh table instances inside _plan_splits_by_bucket. If a schema evolution lands after the initial cat.get_table(...) calls but before planning, the planner will use the new schema/snapshot while the workers are still asked to validate against the old schema id, so _get_table(..., schema_id) raises schema changed during bucket_join even though the split plan itself is current. Please return the planned schema id (or the planned table/read type) from the same table instance used to build each side split plan and pass that to the workers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

l_schema_id and r_schema_id come from the tables loaded before _plan_splits_by_bucket, but the split plans are built from fresh table instances inside _plan_splits_by_bucket. If a schema evolution lands after the initial cat.get_table(...) calls but before planning, the planner will use the new schema/snapshot while the workers are still asked to validate against the old schema id, so _get_table(..., schema_id) raises schema changed during bucket_join even though the split plan itself is current. Please return the planned schema id (or the planned table/read type) from the same table instance used to build each side split plan and pass that to the workers.

Thanks, fixed

…kers

Review (JingsongLi): l_schema_id/r_schema_id were read from the tables loaded
before planning, but each split plan is built from a fresh table instance inside
_plan_splits_by_bucket. A schema evolution between the two loads made workers
validate against the pre-plan schema id and falsely raise 'schema changed'
though the plan itself was current. _plan_splits_by_bucket now returns the
schema id of the very table instance it planned with; pass that to workers.
Review (cursor): _plan_splits_by_bucket pinned scan.snapshot-id but left an
explicit table scan.mode (e.g. latest-full) in place; TableScan's whitelist
validation rejects that pairing, so planning failed on otherwise-valid
co-bucketed tables. Strip scan.mode and other point-in-time scan options before
setting snapshot-id (resolves to a clean snapshot-id time-travel). Add a
regression test with scan.mode=latest-full.
…read

- Raise a clear error if ray.data.from_arrow_refs is missing, instead of an
  opaque AttributeError on very old Ray.
- _plan_splits_by_bucket read the manifest twice (rescale guard + split plan);
  reuse the entries from the guard so scan.plan() doesn't re-read (verified:
  read_manifest_entries goes 2 -> 1 for append/pk tables).
Use `lambda: entries` instead of `lambda *a, **k: entries`: plan_files() is
arg-less today, and dropping the arg swallow makes a future signature change
fail loudly instead of silently returning stale entries.
@JingsongLi

Copy link
Copy Markdown
Contributor

+1

@JingsongLi JingsongLi merged commit 7dc65b9 into apache:master Jul 3, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants