stevenzwu commented on issue #16694:
URL: https://github.com/apache/iceberg/issues/16694#issuecomment-4699836457
# Milestone 1 design: v4 write/read plumbing with fixed 2-level tree +
copy-on-write
## Scope
Milestone 1 keeps the v4 metadata tree at a **fixed 2-level shape** (root
manifest → leaf data/delete manifests) and uses **copy-on-write leaf rewrites**
for all mutations:
- Direct data-file content_entries in the root manifest are reserved for a
follow-up (small-write optimization).
- Manifest delete vectors at the root level are reserved for a follow-up.
- Column-file DV updates are reserved for a follow-up.
The goal is end-to-end parity with v3 commit/scan paths so v4 tables can be
read and written through the existing API surface. Optimizations layer on top
of this substrate in later milestones — they are independent of each other and
can be built in parallel.
## Two-seam architecture
The whole design rests on one principle: **all v4-shape transformation
happens at two thin boundary layers** — `ManifestFiles.read()` on read, and
`SnapshotProducer.apply()` + `ManifestWriter` on write. Everything above these
layers (catalog, `MergingSnapshotProducer`, `Table`, `TableScan`,
`ManifestGroup`, `DeleteFileIndex`, engine code) sees the same legacy
`DataFile`/`DeleteFile`/`ManifestFile`/`ManifestEntry` types it always saw.
```
┌─────────────────────────────────────────────────────────────────┐
│ User APIs: Table, TableScan, AppendFiles, RowDelta, ... │
│ (unchanged — sees only DataFile / DeleteFile / ManifestFile) │
└──────────────────────────────────┬──────────────────────────────┘
│
┌──────────────────────────────────┴──────────────────────────────┐
│ MergingSnapshotProducer / ManifestGroup / DeleteFileIndex │
│ (mostly unchanged; one v4-aware fork in MergingSnapshotProducer│
│ to collapse position-delete DeleteFiles into colocated DVs) │
└─────┬───────────────────────────────────────────────┬───────────┘
│ write read │
┌─────▼─────────────┐ ┌─────────▼───────────┐
│ SEAM 1 — write │ │ SEAM 2 — read │
│ SnapshotProducer │ │ BaseSnapshot. │
│ .apply() │ │ cacheManifests() │
│ ManifestFiles │ │ ManifestFiles │
│ .write() │ │ .read() │
└─────┬─────────────┘ └─────────┬───────────┘
│ │
┌─────▼────────────────────────────────────────────────▼───────────┐
│ v4 native: RootManifestWriter/Reader, ContentEntryAdapter, │
│ ContentEntryReader, TrackedFileAdapters, TrackedFile schema │
└──────────────────────────────────────────────────────────────────┘
```
## Write path: `table.newAppend().appendFile(d).commit()`
### 1. User-facing API is unchanged
`AppendFiles` → `MergeAppend` → `MergingSnapshotProducer` →
`SnapshotProducer`. None of these are v4-aware in their surface.
### 2. Subclass produces leaf manifests
`MergeAppend.apply(base, parent)` collects new `DataFile`s, opens a leaf
manifest via `SnapshotProducer.newManifestWriter(spec)`, writes entries,
returns the `List<ManifestFile>`. The leaf-write call goes through:
```
SnapshotProducer.newManifestWriter(spec)
→ ManifestFiles.write(formatVersion, spec, outputFile, snapshotId)
→ ManifestWriter.V4Writer (when formatVersion == 4)
```
`V4Writer.prepare()` is where `ManifestEntry<DataFile>` becomes a
`content_entry` row:
- `ContentEntryAdapter.fromDataFile(file, status, snapshotId, dataSeq,
fileSeq, schema)` → `TrackedFile`.
- `TrackedFileWrapper.wrap(...)` → positional `StructLike` for the Parquet
appender.
- The wrapped entry is what `writeRaw()` writes to disk.
What ends up on disk: a `.parquet` leaf manifest, schema =
`TrackedFile.schemaWithContentStats(...)`, with `format-version=4` in the
Parquet footer key-value metadata. Each row is a content_entry: `tracking +
content_type=DATA + writer_format_version=1 + location + ...`.
### 3. RowDelta collapses position-delete inputs into colocated DVs
The v4 spec forbids `content_type=POSITION_DELETES`. When
`RowDelta.addDeletes(positionDelete)` runs against a v4 table,
`MergingSnapshotProducer` doesn't emit a separate delete manifest. Instead it
groups by `referencedDataFile()` and rewrites the affected leaf manifest in
copy-on-write style:
- Original entry → `replacedEntry()` → content_entry with
`tracking.status=REPLACED`, no `deletion_vector`.
- Paired new entry → `modifiedEntry(entry, dv)` → same data file,
`tracking.status=MODIFIED`, `deletion_vector` struct populated from the input's
Puffin pointer.
Born-with-DV files (data file added in the same commit as its DV) emit a
single `ADDED` content_entry with `deletion_vector` populated — no
REPLACED/MODIFIED pair.
### 4. `SnapshotProducer.apply()` writes the root manifest
After the subclass returns, `apply()` branches on `base.formatVersion()`:
- `formatVersion < 4` (existing behavior): `ManifestLists.write(...)` to a
`.avro` manifest list, `BaseSnapshot` constructed with `manifestListLocation =
path, rootManifestLocation = null`.
- `formatVersion >= 4`: opens `rootManifestPath()` (a `.parquet` file),
calls `RootManifests.write(...)` which returns a `RootManifestWriter`. Each
`ManifestFile` from the subclass is added via `add(manifest, status,
writerFormatVersion)`:
- newly-written leaves → `ADDED`, `writer_format_version=1`
- carried-over from prior snapshot → `EXISTING`, `writer_format_version=1`
(or `0` for v3 leaves carried over during a v3→v4 upgrade)
- Each call goes through `ContentEntryAdapter.fromManifestFile(...)` to
produce a content_entry with `content_type ∈ {DATA_MANIFEST, DELETE_MANIFEST}`
and `manifest_info` populated from the ManifestFile's counts.
- `BaseSnapshot` is constructed with `manifestListLocation = null,
rootManifestLocation = path, formatVersion = 4`.
### 5. Snapshot commits to the catalog
The `BaseSnapshot` lands in `TableMetadata.snapshots()` and gets serialized
by `SnapshotParser.toJson(...)`. v4 snapshots emit `"root-manifest": "..."`
instead of `"manifest-list": "..."`. The format version threads through
`TableMetadataParser.fromJson(...)` so on the read side `SnapshotParser` knows
which key to expect.
## Read path: `table.newScan().planFiles()`
### 1. User-facing API is unchanged
`Table.newScan()` → `BaseTableScan` → `Scan.planFiles()` → returns
`CloseableIterable<FileScanTask>`. Same shape regardless of format version.
### 2. Resolving the manifest tree
`BaseTableScan` calls `snapshot.dataManifests(io)` /
`snapshot.deleteManifests(io)`, which both go through
`BaseSnapshot.cacheManifests()`. That method branches on the `formatVersion`
field stored on `BaseSnapshot`:
```
formatVersion >= 4
? RootManifests.read(io.newInputFile(rootManifestLocation), specsById)
: ManifestLists.read(io.newInputFile(manifestListLocation))
```
Both calls return `List<ManifestFile>`. `RootManifestReader` reconstructs
each `ManifestFile` from a content_entry with `content_type ∈ {DATA_MANIFEST,
DELETE_MANIFEST}` — path, length, partition spec id, sequence number, and all
the count fields are populated from `manifest_info`. Direct data-file
content_entries (`content_type ∈ {DATA, EQUALITY_DELETES}`) are skipped with a
debug log in milestone 1; reading those becomes first-class when the
small-write optimization lands.
### 3. Reading individual manifests
For each `ManifestFile`, `ManifestGroup` opens a `ManifestReader<DataFile>`
via `ManifestFiles.read(manifest, io, specsById, writerFormatVersion)`.
Inside `ManifestFiles.read()`:
- Hint supplied (`>= 1`) or schema-shape detection finds field 134
(`content_type`) / 147 (`tracking`) → `ContentEntryReader` wrapped in
`ContentEntryManifestReaderAdapter` (which extends `ManifestReader<F>`).
- Otherwise → existing `ManifestReader<F>` for v1–v3 manifests.
Both return the same `ManifestReader<F>` API, so `ManifestGroup`'s
consumption code is identical.
### 4. content_entry → legacy types via `TrackedFileAdapters`
`ContentEntryReader` reads each content_entry row into a
`TrackedFileStruct`, then projects it through `TrackedFileAdapters` (#16100, on
main):
| content_entry shape | Projection |
|---|---|
| `content_type=DATA`, no DV | `asDataFile(trackedFile, specsById)` → emits
a `ManifestEntry<DataFile>` |
| `content_type=DATA`, non-null `deletion_vector` | emits **two** entries:
`asDataFile(...)` to the data stream, `asDVDeleteFile(...)` to the delete
stream |
| `content_type=EQUALITY_DELETES` | `asEqualityDeleteFile(trackedFile,
specsById)` → emits a `ManifestEntry<DeleteFile>` |
`TrackedDVDeleteFile` exposes the colocated DV as a synthetic `DeleteFile`
of `FileContent.POSITION_DELETES` whose `referencedDataFile()` /
`contentOffset()` / `contentSizeInBytes()` come from the v4 `deletion_vector`
struct. Downstream `DeleteFileIndex.builderFor(...)` groups DVs by
`referencedDataFile()` exactly as it always has.
### 5. Scan planning consumes the projection
`ManifestGroup.planFiles()` filters `ManifestEntry<DataFile>` by partition
predicates and stats, then `DeleteFileIndex` matches each surviving `DataFile`
against position-delete and equality-delete `DeleteFile`s. The result is
`FileScanTask(file, deletes)` — same shape as v3. `FileScanTask.deletes()`
includes the synthesized DV `DeleteFile`, so the engine reads the data file
with the DV applied without knowing it came from a colocated `deletion_vector`
struct.
## Future optimizations (parallelizable on top of milestone 1)
These layer on top of the milestone-1 substrate. Each is independent and can
be built without blocking the others:
1. **Adaptive tree shape (1-level or 2-level)**. Small writes emit data-file
content_entries directly in the root manifest, bypassing leaf-manifest
creation. `RootManifestWriter` already accepts the shape; `RootManifestReader`
currently skips `content_type ∈ {DATA, EQUALITY_DELETES}` with a debug log —
those become first-class. Introduces a write-time heuristic for when to flatten.
2. **Manifest delete vectors at the root**. Marks a leaf manifest's row as
deleted via a manifest DV in the root manifest's `manifest_info.dv` field,
instead of rewriting the leaf in copy-on-write fashion. Cuts write
amplification for high-churn workloads. Requires read-side support for applying
root-level manifest DVs when projecting the manifest tree.
3. **Column-file DV updates**. Update individual data DV columns via column
files (the spec's `column_files` field on `content_type ∈ {DATA,
DATA_MANIFEST}`) rather than rewriting the entire leaf. Reduces write cost when
only DV state changes for a subset of columns.
## Known issues to track separately
- Parquet rejects empty groups, which prevents writing v4 leaf manifests for
unpartitioned tables when the partition struct has no fields. Milestone-1 tests
skip the unpartitioned Parquet cases. Resolution likely requires omitting the
partition column from the Parquet schema at write time when the spec is
unpartitioned, while keeping the canonical `content_entry` schema unchanged.
- `replacedFilesCount` / `replacedRowsCount` from the leaf-manifest counters
do not yet round-trip through the root manifest's `manifest_info`. Tests verify
the raw leaf rows directly. Requires populating those fields in
`ContentEntryAdapter.fromManifestFile(...)` and reading them back in
`RootManifestReader.toManifestFile(...)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]