talatuyarer opened a new pull request, #14264:
URL: https://github.com/apache/iceberg/pull/14264
This PR extends `BaseIncrementalChangelogScan` to support positional and
equality delete files (MoR use cases), enabling complete CDC coverage.
I read all discussion on @wypoon 's #10935 The PR was old and also it does
not cover all cases. I decided to implement with a fresh approach but reduced
scope. Initial PR had spark implementation. in this PR I will only focus on
core module support. With this implementation, changelog scans now correctly
produce `DeletedRowsScanTask`, `DeletedDataFileScanTask`, and
`AddedRowsScanTask` even when delete files are present.
_I implemented unit tests for following scenario behaviors_
Scenario | Expected Behavior
-- | --
Insert → Equality Delete → Re-insert | One DELETE + one INSERT changelog
event
Insert & Delete in Same Commit | Emitted as one task with attached deletes
Overlapping Equality + Position Deletes | Deduplicated delete application
Existing Deletes + New Deletes | Both applied without duplication
Overwrite Snapshot with Deletes | No NPE, correct ADDED/DELETED tasks
Large Delete Set | Pruned by partition filter
#### Step by Step Planning Logic:
The `doPlanFiles` method now:
1. Builds the existing delete index from the base snapshot
2. Builds per-snapshot added delete indexes for each changelog snapshot
3. Plans data file tasks (ADDED/DELETED) with appropriate delete files
attached
4. Plans additional `DeletedRowsScanTask` instances for EXISTING data files
affected by new delete files
#### Three Types of Changelog Tasks
1. **AddedRowsScanTask**: For newly added data files
- Includes both existing deletes (inherited) and newly added deletes in
the same snapshot
- Produces INSERT changelog rows
2. **DeletedDataFileScanTask**: For deleted data files
- Includes existing deletes that were present before file deletion
- Produces DELETE changelog rows for the entire file
3. **DeletedRowsScanTask**: For existing data files with new delete files
- `addedDeletes()`: New delete files added in this snapshot (generate
DELETE rows)
- `existingDeletes()`: Delete files that existed before (applied for
filtering)
- Produces DELETE changelog rows only for newly deleted records
_Processing multiple snapshot and equality deletes are expensive operations
for huge tables. I also added few performance optimization._
Optimization | Description
-- | --
Manifest caching | Introduced DeleteManifestCache to avoid redundant
manifest parsing
Partition pruning | Uses filter pushdown to skip irrelevant delete manifests
Incremental accumulation | Avoids rebuilding the delete index for every
snapshot
Selective copy | ContentFileUtil.copy() keeps only essential stats to reduce
memory footprint
#### Possible Future Work
- V3 Deletion Vector support. I am trying to define foundation for Deletion
Vector base CDC.
- Add persistent delete index cache
- Engine level support
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]