shangxinli commented on PR #14435:
URL: https://github.com/apache/iceberg/pull/14435#issuecomment-3582429516
Summary of Changes
1. Added _last_updated_sequence_number Support for Row Lineage (pvary's
comment)
- File:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
- Extended row lineage to include both _row_id AND
_last_updated_sequence_number columns
- Renamed methods: addRowIdColumn() → addRowLineageColumns(),
mergeFilesWithRowIdsAndSchema() → mergeFilesWithRowLineageAndSchema()
- Added writeSequenceNumberColumnChunk() method to write the sequence
number column
- Updated mergeFiles() signature to accept dataSequenceNumbers parameter
- The _last_updated_sequence_number is preserved from the original file's
dataSequenceNumber (not a new value)
- File:
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
- Extract dataSequenceNumbers from DataFile objects (line 239-240)
- Updated MergeTaskInfo class to include dataSequenceNumbers field
- Pass dataSequenceNumbers to ParquetFileMerger.mergeFiles()
2. Refactored Code to Eliminate Duplication (user feedback)
- File:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
- Created utility helper writeLongColumnChunk() (lines 527-574) that
accepts a LongUnaryOperator for value generation
- Both writeRowIdColumnChunk() and writeSequenceNumberColumnChunk() now
delegate to this helper
- Eliminated ~80 lines of duplicated code
3. Used Parquet's Built-in Constants (pvary's comment)
- Files: ParquetUtil.java, ParquetWriter.java,
SparkParquetFileMergeRunner.java
- Removed custom constants: COLUMN_INDEX_TRUNCATE_LENGTH and
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH
- Now using upstream constants:
- ParquetOutputFormat.COLUMN_INDEX_TRUNCATE_LENGTH
- ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH
4. Added Comprehensive Row Lineage Tests (pvary's comment)
- File:
parquet/src/test/java/org/apache/iceberg/parquet/TestParquetFileMerger.java
- Added 6 new tests:
a. testMergeFilesSynthesizesRowLineageColumns() - Verifies synthesis of
both columns
b. testMergeFilesWithMultipleRowGroups() - Tests across multiple row
groups
c. testMergeFilesWithDifferentDataSequenceNumbers() - Verifies sequence
number preservation
d. testMergeFilesWithoutRowLineage() - Tests merging without row lineage
e. testMergeFilesWithPhysicalRowLineageColumns() - Tests binary copy of
existing columns
f. testCanMergeReturnsFalseForPhysicalRowLineageWithNulls() - Validates
null rejection
- Updated existing tests to use files with data instead of empty files
5. Changed Log Levels for Fallback Scenarios (user request)
- File:
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
- Changed 3 log statements from WARN to INFO (lines 104, 168, 176)
- These represent normal fallback scenarios, not errors
6. Added Two New Validation Checks (pvary's comments)
- File:
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
- Partition spec change check (lines 157-167): Falls back if partition
spec is changing (row-group merge cannot repartition data)
- File splitting check (lines 169-181): Falls back if any input file is
larger than target output size (row-group merge is optimized for merging, not
splitting)
- Updated javadoc to document these new checks
7. Updated OutputFileFactory Creation Pattern (pvary's comment)
- File:
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
- Changed from: OutputFileFactory.builderFor(table(), spec.specId(),
batchIndex)
- Changed to: OutputFileFactory.builderFor(table(), batchIndex,
0).defaultSpec(spec)
8. Kept Defensive Validation (pvary's earlier comment)
- File:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
- Maintained validateRowLineageColumnsHaveNoNulls() check in mergeFiles()
even though it's also called in canMerge()
- This is defensive programming since mergeFiles() is a public API that
can be called directly
Files Modified
1. parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
2.
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
3. parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
4. parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
5.
parquet/src/test/java/org/apache/iceberg/parquet/TestParquetFileMerger.java
Key Design Decisions
- Row lineage is treated as a cohesive unit (both _row_id and
_last_updated_sequence_number together)
- Preserved dataSequenceNumber from original files (not creating new
sequence numbers)
- Used DELTA_BINARY_PACKED encoding for both row lineage columns (optimal
compression)
- Fallback to standard Spark rewrite is graceful and automatic when
optimization cannot be used
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]