diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java index 92e88bac9f..f4ed350e3a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -202,6 +202,7 @@ public void resetDictionary() { lastUsedDictionaryByteSize = 0; lastUsedDictionarySize = 0; dictionaryTooBig = false; + dictionaryByteSize = 0; clearDictionaryContent(); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java index 7f56ef2192..41fe484f37 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java @@ -105,6 +105,11 @@ public void reset() { rawDataByteSize = 0; firstPage = false; currentWriter.reset(); + // After a fallback, currentWriter is the fallback writer, so the initial dictionary writer is never reset at + // row-group boundaries, which can silently corrupt the next row group + if (currentWriter != initialWriter) { + initialWriter.reset(); + } } @Override @@ -124,10 +129,12 @@ public DictionaryPage toDictPageAndClose() { @Override public void resetDictionary() { - if (initialUsedAndHadDictionary) { + currentWriter.resetDictionary(); + // After a fallback, currentWriter is the fallback writer, so the initial dictionary writer's + // dictionary is never reset at row-group boundaries, leaving stale dictionary entries/IDs that can silently + // corrupt the next row group + if (currentWriter != initialWriter) { initialWriter.resetDictionary(); - } else { - currentWriter.resetDictionary(); } currentWriter = initialWriter; fellBackAlready = false; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java index c7cf351990..06f1499b7b 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java @@ -285,6 +285,67 @@ public void testSecondPageFallBack() throws IOException { } } + @Test + public void testDictionaryWriterReusableAfterFallBack() throws IOException { + int count = 1000; + try (final FallbackValuesWriter cw = + newPlainBinaryDictionaryValuesWriter(1000, 10000)) { + + // --- Row group 1 --- + // First page is dictionary encoded and committed, which keeps the dictionary alive for + // the whole row group. + writeRepeated(count, cw, "a"); + getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY); + + // Second page no longer fits the dictionary so it falls back to plain. The current writer + // becomes the fallback writer, while the dictionary writer still buffers this page's ids. + writeDistinct(count, cw, "b"); + getBytesAndCheckEncoding(cw, PLAIN); + + // End of row group 1: emit the dictionary page and reset the dictionary state for reuse. + Assert.assertNotNull(cw.toDictPageAndClose()); + cw.resetDictionary(); + + // --- Row group 2 --- + // The dictionary writer must be clean again + writeRepeated(count, cw, "c"); + BytesInput rg2Bytes = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY); + + // The page must decode back to exactly the values written in row group 2. + DictionaryValuesReader cr = initDicReader(cw, BINARY); + checkRepeated(count, rg2Bytes, cr, "c"); + } + } + + @Test + public void testDictionaryWriterReusableAfterFirstPageFallBack() throws IOException { + int count = 1000; + try (final FallbackValuesWriter cw = + newPlainBinaryDictionaryValuesWriter(10000, 10000)) { + + // --- Row group 1 --- + // The very first page falls back to plain because dictionary encoding is not efficient. Because the + // fallback happens on the first page, the dictionary was never committed as the page encoding, so + // initialUsedAndHadDictionary stays false and the current writer becomes the fallback writer. The + // dictionary writer, however, still holds this page's entries and byte size. + writeDistinct(count, cw, "a"); + getBytesAndCheckEncoding(cw, PLAIN); + + // End of row group 1: reset the dictionary state for reuse + cw.resetDictionary(); + + // --- Row group 2 --- + // The data is now dictionary friendly, so it must be dictionary encoded again. Without a clean initial + // dictionary writer, the stale entries/byte size from row group 1 would push this page back to plain. + writeRepeated(count, cw, "b"); + BytesInput rg2Bytes = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY); + + // The page must decode back to exactly the values written in row group 2. + DictionaryValuesReader cr = initDicReader(cw, BINARY); + checkRepeated(count, rg2Bytes, cr, "b"); + } + } + @Test public void testLongDictionary() throws IOException { int COUNT = 1000;