[python][ray] Add no-shuffle bucket join for co-bucketed tables#8397
Conversation
Add pypaimon.ray.bucket_join: a no-shuffle join for two fixed-bucket (HASH_FIXED) tables that share a bucket count and bucket-key, joined on that key. Each bucket is read and joined locally in its own Ray task, so equal keys co-locate by bucket and there is no global shuffle -- the alternative to ray.data.join for a co-bucketed url->row_id locator table vs a per-task input table (data-evolution main tables are bucket-unaware, so bucketing is borrowed via the lightweight locator table). Validates matching bucket count / bucket-key / join-key; rejects otherwise.
…lits Plan each side's manifest a single time in the driver and group splits by bucket, then hand each bucket task only its own splits (splits pickle to Ray cleanly), instead of re-planning the full manifest inside every bucket task. Only buckets present on both sides are dispatched (inner join).
Cover the remaining reject paths besides bucket-count: different bucket-key, and a join key that is not the bucket-key.
…r join Avoid driver OOM at scale: return ray.data.from_arrow_refs(refs) so each bucket's join output stays a distributed object ref, instead of ray.get-ing every bucket into the driver and rebuilding with from_arrow (which would pull hundreds of GB of matched rows onto one node). Also restrict join_type to 'inner': the per-bucket intersection is only correct for inner joins (an outer join needs the union of buckets), so reject others with a clear error instead of silently returning wrong results.
…er worker - No shared bucket now returns an empty Dataset that keeps the join schema (join two empty reads) instead of a schema-less from_items([]). - Per-process table cache reused across a worker's bucket reads, so the catalog is not reloaded per bucket; planning still loads a fresh table. - Document the no-overlapping-column assumption in the docstring; note that planning is driver-side metadata only. - Tests: fan-out (one url -> many row_ids) and empty-result-keeps-schema.
Bucket ids are per-partition, so grouping splits by bucket alone would join rows across partitions. Reject partitioned tables up front (before any read) until (partition, bucket) grouping is implemented.
- Resolve bucket keys via table_schema.bucket_keys instead of the raw bucket-key option, so primary-key tables that bucket by their PK (no explicit bucket-key) are accepted instead of wrongly rejected. - Validate up front that each projection keeps the join key and that the two sides share no non-key columns, instead of failing deep inside a Ray task with an opaque pyarrow error. - Cover PK default bucket-key, projection-missing-key, and column collision.
Add a Bucket Join section to the Ray Data page: usage, parameters, return type, and the co-bucketing / inner-only / non-partitioned constraints.
…orker cache Address review findings: - Reject mismatched bucket-function.type (default/mod/hive hash the same key to different buckets, so co-location is not guaranteed). - Reject bucket-key columns whose types differ across sides (same name but INT vs BIGINT can hash apart and silently drop matches). - Key the per-worker table cache by schema id and guard it with a lock, so a schema change invalidates a stale cached table instead of mis-decoding. - Docs: note bucket-key type equality, projection-only column disambiguation, per-side snapshot, and skew/memory per bucket. - Tests: per-bucket dispatch (no shuffle), composite key, schema-id cache keying, bucket-function and key-type rejects.
A rescaled table (ALTER bucket, not yet rewritten) keeps old files under a different total_buckets. Splits carry only bucket, so the same bucket id across two bucket spaces would falsely co-locate and silently drop matches. Reject at plan time (via manifest entries' total_buckets), mirroring Spark's consistent-bucket guard. Also compare bucket-key types ignoring nullability (a present key hashes the same regardless), so a NOT NULL vs nullable key no longer over-rejects.
…le schema - Cache miss loaded the latest schema but stored it under the requested schema id; now verify the loaded schema id matches and fail fast if the schema moved after planning (a stale split plan), instead of reading with the wrong schema. - Pin the rescale guard and the split plan to a single snapshot, so a commit landing between the two manifest reads cannot slip stale-bucket files past the guard. - Compare bucket-key types ignoring nullability (already in prior commit). - Tests updated for the fail-fast cache path and a written rescale table.
Hoist the nested _key_type helper to module level; a nested def with no preceding blank line tripped E301.
| left_by_bucket = _plan_splits_by_bucket(left, catalog_options, left_projection, lcount) | ||
| right_by_bucket = _plan_splits_by_bucket(right, catalog_options, right_projection, rcount) | ||
|
|
||
| l_schema_id, r_schema_id = ltable.table_schema.id, rtable.table_schema.id |
There was a problem hiding this comment.
l_schema_id and r_schema_id come from the tables loaded before _plan_splits_by_bucket, but the split plans are built from fresh table instances inside _plan_splits_by_bucket. If a schema evolution lands after the initial cat.get_table(...) calls but before planning, the planner will use the new schema/snapshot while the workers are still asked to validate against the old schema id, so _get_table(..., schema_id) raises schema changed during bucket_join even though the split plan itself is current. Please return the planned schema id (or the planned table/read type) from the same table instance used to build each side split plan and pass that to the workers.
There was a problem hiding this comment.
l_schema_idandr_schema_idcome from the tables loaded before_plan_splits_by_bucket, but the split plans are built from fresh table instances inside_plan_splits_by_bucket. If a schema evolution lands after the initialcat.get_table(...)calls but before planning, the planner will use the new schema/snapshot while the workers are still asked to validate against the old schema id, so_get_table(..., schema_id)raisesschema changed during bucket_joineven though the split plan itself is current. Please return the planned schema id (or the planned table/read type) from the same table instance used to build each side split plan and pass that to the workers.
Thanks, fixed
…kers Review (JingsongLi): l_schema_id/r_schema_id were read from the tables loaded before planning, but each split plan is built from a fresh table instance inside _plan_splits_by_bucket. A schema evolution between the two loads made workers validate against the pre-plan schema id and falsely raise 'schema changed' though the plan itself was current. _plan_splits_by_bucket now returns the schema id of the very table instance it planned with; pass that to workers.
Review (cursor): _plan_splits_by_bucket pinned scan.snapshot-id but left an explicit table scan.mode (e.g. latest-full) in place; TableScan's whitelist validation rejects that pairing, so planning failed on otherwise-valid co-bucketed tables. Strip scan.mode and other point-in-time scan options before setting snapshot-id (resolves to a clean snapshot-id time-travel). Add a regression test with scan.mode=latest-full.
…read - Raise a clear error if ray.data.from_arrow_refs is missing, instead of an opaque AttributeError on very old Ray. - _plan_splits_by_bucket read the manifest twice (rescale guard + split plan); reuse the entries from the guard so scan.plan() doesn't re-read (verified: read_manifest_entries goes 2 -> 1 for append/pk tables).
Use `lambda: entries` instead of `lambda *a, **k: entries`: plan_files() is arg-less today, and dropping the arg swallow makes a future signature change fail loudly instead of silently returning stale entries.
|
+1 |
Purpose
Tests