refactor: make file-statistics cache keys schema-aware#23201
refactor: make file-statistics cache keys schema-aware#23201Phoenix500526 wants to merge 8 commits into
Conversation
44b33de to
a13e269
Compare
|
I am going to run the |
|
run bechmark wide_schema env:
DATAFUSION_RUNTIME_FILE_STATISTICS_CACHE_LIMIT: 0 |
|
run bechmark wide_schema |
|
run bechmark wide_schema baseline:
env:
DATAFUSION_RUNTIME_FILE_STATISTICS_CACHE_LIMIT: 0 |
This is cool, i did not know this. |
|
run benchmark wide_schema env:
DATAFUSION_RUNTIME_FILE_STATISTICS_CACHE_LIMIT: 0 |
|
run benchmark wide_schema baseline:
env:
DATAFUSION_RUNTIME_FILE_STATISTICS_CACHE_LIMIT: 0 |
|
run benchmark wide_schema |
Yep the idea here is to run baseline w/o cache and this branch w/ cache. Orthogonal to this PR but I want to see how it looks like. Unfortunately I had a typo in benchmark, sorry for the noise. |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (a13e269) to ff677c4 (merge-base) diff using: wide_schema File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (a13e269) to ff677c4 (merge-base) diff using: wide_schema File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (a13e269) to ff677c4 (merge-base) diff using: wide_schema File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagewide_schema — base (merge-base)
wide_schema — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagewide_schema — base (merge-base)
wide_schema — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagewide_schema — base (merge-base)
wide_schema — branch
File an issue against this benchmark runner |
|
Unfortunately we have regressions in the benchmarks: |
|
run benchmark wide_schema baseline:
env:
DATAFUSION_RUNTIME_FILE_STATISTICS_CACHE_LIMIT: 0 |
|
run benchmark wide_schema |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (9618f88) to ff677c4 (merge-base) diff using: wide_schema File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (9618f88) to ff677c4 (merge-base) diff using: wide_schema File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagewide_schema — base (merge-base)
wide_schema — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagewide_schema — base (merge-base)
wide_schema — branch
File an issue against this benchmark runner |
File statistics are computed against a specific `file_schema`, but the file-statistics cache was keyed only by table and path. Reading the same path under a different schema could reuse statistics whose `column_statistics` no longer line up, panicking during statistics projection. apache#22950 worked around this by bypassing the cache entirely for anonymous explicit-schema reads, at the cost of losing cache reuse for them. Introduce a `SchemaFingerprint` (per-column name, type and nullability, derived from `file_schema`) and a `FileStatisticsCacheKey { table, path, schema }`, and key the file-statistics cache on it. Different schemas now get distinct entries (no stale cross-schema reuse) while a repeated read of the same schema reuses its entry, so the apache#22950 bypass is removed and anonymous explicit-schema reads cache safely again. - The fingerprint excludes field/schema metadata (cannot affect statistics) and partition columns (their statistics are computed separately). - Table-drop invalidation is unchanged: drop_table_entries matches on CacheKey::table_ref(), which still returns the table, so all schema variants for a table are removed together. - The list-files cache continues to key on TableScopedPath. Closes apache#23072. Signed-off-by: Jiawei Zhao <Phoenix500526@163.com>
…apSize Add a `DFHeapSize` impl for 3-tuples (mirroring the existing 2-tuple one) so `Vec<(String, DataType, bool)>` accounts for its heap automatically, letting `SchemaFingerprint::heap_size` delegate to it instead of computing the size by hand. Also update the `test_statistics_cache` unit test to key on `FileStatisticsCacheKey` so it matches the real file-statistics cache. Signed-off-by: Jiawei Zhao <Phoenix500526@163.com>
|
@kosiew Do you maybe have time for a second opinion? |
There was a problem hiding this comment.
@Phoenix500526
Thanks for the fix. I think the schema-aware cache key is the right direction, but I think the implementation can be simplified a bit before this lands.
| /// nullability, in order. It deliberately excludes field/schema metadata, which | ||
| /// cannot affect statistics — including it would needlessly fragment the cache. | ||
| #[derive(Clone, Debug)] | ||
| pub struct SchemaFingerprint { |
There was a problem hiding this comment.
The schema-aware key looks correct, and I think this fixes the bug. That said, the implementation feels a bit more involved than this cache path needs.
Could we simplify SchemaFingerprint to a small derived newtype over something like Vec<(String, DataType, bool)>, or an equivalent representation, and rely on derived Hash and Eq? The precomputed hash plus custom PartialEq collision handling adds some cleverness that feels hard to justify here unless profiling shows schema-key hashing is material.
There was a problem hiding this comment.
If you have a look at the benchmarks results clickbench_partitioned and wide_schema you will find that without this optimization there are real regressions.
There was a problem hiding this comment.
After adding the schema key hash, the performance regression was indeed fixed. I originally wanted to profile the change to determine whether schema key hashing is on the hot path, but my local machine is not powerful enough and the test takes too long to complete, so I’m unable to continue profiling it for now.
There was a problem hiding this comment.
have a look at the benchmarks results clickbench_partitioned and wide_schema you will find that without this optimization there are real regressions.
Sorry I missed this.
I wonder whether it would be viable to keep TableScopedPath as the cache key and store the schema fingerprint in the cached value too? The lookup would be:
- look up by the existing
{table, path}key; - validate file metadata as today (
size,last_modified); - also validate
cached.schema_fingerprint == current_schema_fingerprint; - if either validation fails, treat it as a miss and overwrite the entry for that
{table, path}.
This still fixes the bug: stats computed for one schema cannot be reused for another schema. It also preserves cache reuse for repeated anonymous explicit-schema reads with the same schema, which is the improvement over #22950.
The tradeoff is that we would only keep one schema's stats per {table, path}, so alternating schemas for the same file may recompute. That seems acceptable to me because the previous fallback was to skip caching for this problematic case entirely, and a full multi-schema cache feels more complex than this issue needs.
Benchmark-wise, this may improve Q6 if the regression is primarily from schema-aware cache keys, because the key hash/probe returns to the old cheap {table, path} shape. We would still need to verify with clickbench_partitioned, since value-side validation still has per-hit cost: the current schema fingerprint must exist and cache hits still need schema validation/fingerprint comparison. For wide schemas, the precomputed fingerprint hash can still keep that validation cheap. Memory/eviction should also improve because there is at most one entry per {table, path} rather than one per {table, path, schema}.
If we take this approach, FileStatisticsCache can likely remain keyed by TableScopedPath, so the 55.0.0 upgrade-guide entry for changing the public cache key type can be removed or reduced to any smaller CachedFileMetadata API change.
There was a problem hiding this comment.
It's worth a shot and it's already in commit #985b8f374. Could you help run a benchmark to see what's going on? Thanks 😊 @kosiew
| fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { | ||
| self.path.as_ref().heap_size(ctx) | ||
| + self.table.heap_size(ctx) | ||
| + self.schema.as_ref().heap_size(ctx) |
There was a problem hiding this comment.
FileStatisticsCacheKey::heap_size appears to deep-count the shared SchemaFingerprint for every cached file key. Since ListingTable now shares one Arc<SchemaFingerprint> across all files for the same table and schema, this could overstate cache memory for wide schemas with many files and lead to earlier eviction than necessary.
Could we count only the incremental cache-owned cost here, or add a small test that documents the intended accounting tradeoff?
There was a problem hiding this comment.
Yes, it makes sense. Since the schema is shared through an Arc, and the heap allocation behind it is shared by both the cache keys and the ListingTable, I chose to skip counting its heap size.
FileStatisticsCacheKey stores a shared Arc to SchemaFingerprint. Do not charge the fingerprint heap allocation to each file key, because that overstates memory use for wide schemas and causes early eviction. Add a regression test to keep schema width out of per-key sizing. Signed-off-by: Jiawei Zhao <Phoenix500526@163.com>
Keep the file-statistics cache keyed by table and path while storing the file-schema fingerprint in the cached metadata value. This preserves schema correctness without changing the public cache key type. Signed-off-by: Jiawei Zhao <Phoenix500526@163.com>
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
run benchmark clickbench_partitioned |
|
run benchmark wide_schema baseline:
env:
DATAFUSION_RUNTIME_FILE_STATISTICS_CACHE_LIMIT: 0 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (985b8f3) to d58e0c6 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (985b8f3) to d58e0c6 (merge-base) diff using: wide_schema File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagewide_schema — base (merge-base)
wide_schema — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (985b8f3) to d58e0c6 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
There was a problem hiding this comment.
Thanks for working through the earlier feedback. I think the overall direction is much better, especially keeping TableScopedPath as the cache key and moving schema validation into the cached value. The remaining concern is performance.
Although the schema fingerprint hash is now precomputed, successful cache hits still perform an O(schema width) comparison. Given the reported wide_schema and clickbench_partitioned results, I think it would be good to make the common cache hit path O(1) before merging.
| ) -> bool { | ||
| self.meta.size == current_meta.size | ||
| && self.meta.last_modified == current_meta.last_modified | ||
| && self.schema_fingerprint.as_ref() == current_schema_fingerprint |
There was a problem hiding this comment.
This is much closer to what I had in mind, but I think there is still one remaining performance issue.
SchemaFingerprint::eq first compares the precomputed hash and then falls back to comparing every column. On a successful cache hit, the hashes match, so we still end up walking the entire Vec<(String, DataType, bool)>. That means schema validation on cache hits is still O(schema width).
The precomputed hash helps reject different schemas cheaply, but it does not make the common successful validation path O(1). That also lines up with the benchmark results. wide_schema is still slightly slower and clickbench_partitioned Q6 still shows a noticeable regression.
Could we make successful validation constant time as well? For example, adding a pointer identity fast path for shared Arc<SchemaFingerprint> values would let the common case avoid the deep comparison while still preserving collision safety.
There was a problem hiding this comment.
Good point. I added an Arc::ptr_eq fast path for the shared-fingerprint case, while keeping exact equality as a fallback for equivalent fingerprints from different Arc allocations. This keeps collision safety and existing cache reuse behavior, but makes the common successful validation path O(1).
Use Arc pointer identity to avoid deep schema comparison on common file statistics cache hits. Keep exact equality as a fallback for equivalent fingerprints built from different Arc allocations. Signed-off-by: Jiawei Zhao <Phoenix500526@163.com>
|
run benchmark clickbench_partitioned |
|
run benchmark wide_schema |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (ec13f59) to d58e0c6 (merge-base) diff using: wide_schema File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (ec13f59) to d58e0c6 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagewide_schema — base (merge-base)
wide_schema — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing issue/23072 (ec13f59) to d58e0c6 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
from: there is no consistent reproducible performance regression |
kosiew
left a comment
There was a problem hiding this comment.
@Phoenix500526
Thanks for your persistence in this.
Looks 👍 to me
Which issue does this PR close?
Rationale for this change
File statistics are computed against a specific
file_schema(theircolumn_statisticsare positional, one per column), but the file-statisticscache was keyed only by table and path. Reading the same path under a different
schema could therefore reuse statistics whose columns no longer line up,
panicking during statistics projection.
#22950 worked around this by bypassing the file-statistics cache entirely for
anonymous explicit-schema reads — correct, but it gave up cache reuse for them
(every such read recomputes statistics). #23072 asks to make the cache itself
schema-aware so those reads can reuse the cache safely instead of skipping it.
What changes are included in this PR?
SchemaFingerprint— the per-column(name, data_type, nullable)of afile_schema, in order — andFileStatisticsCacheKey { table, path, schema },and key the file-statistics cache on it (
FileStatisticsCacheis nowdyn Cache<FileStatisticsCacheKey, CachedFileMetadata>).ListingTable::do_collect_statistics_and_orderingbuilds the key with thefile_schemafingerprint and uses the shared cache directly. The fix: isolate anonymous file statistics cache #22950bypass (
statistics_cachehelper /schema_source-based skip) is removed:different schemas now land in distinct entries (no stale cross-schema reuse),
while a repeated read of the same schema reuses its entry.
affect statistics, and including it would needlessly fragment the cache) and
partition columns (partition statistics are computed separately, outside this
cache).
drop_table_entriesmatches onCacheKey::table_ref(), which still returns the table, so all schema variantsfor a dropped table are removed together.
TableScopedPath.Are these changes tested?
Yes.
(
anonymous_parquet_stats_cache_with_explicit_wider_schema): the widerexplicit-schema read now lands in its own cache entry (2 entries, was 1 under
the bypass) with correct statistics and no panic, and a repeated read of that
schema is served from the cache (a cache hit, no new entry).
SchemaFingerprint: it distinguishes nullability andfield order, and ignores field/schema metadata.
cargo testfor thefile_statisticsintegration module and thedatafusion-executioncache tests (includingdrop_table_entries) pass, alongwith
cargo fmt --allandcargo clippy --all-targets --all-features -- -D warningsfor the touched crates.Are there any user-facing changes?
No change to query results, physical plans, or the serialized (proto) wire
format; file statistics are computed exactly as before.
One public API change (please add the
api changelabel): theFileStatisticsCachetype alias now usesFileStatisticsCacheKeyinstead ofTableScopedPathas its key. Code that constructed keys for this cache directlymust switch to
FileStatisticsCacheKey.SchemaFingerprintandFileStatisticsCacheKeyare newly public;TableScopedPathremains (still usedby the list-files cache).
cargo-semver-checkswill flag the key-type change,which is expected.