mbutrovich commented on PR #2153:
URL: https://github.com/apache/iceberg-rust/pull/2153#issuecomment-4255224845

   I paired up with Claude to review for a while today, cross-referencing the 
Java behavior and spec. Suggestion 3 is the one I feel the least strongly 
about, and would benefit from another set of eyes.
   
    Here are the compiled notes:
   
   ### 1. Separate scan type, don't bolt onto `TableScanBuilder`
   
   The spec defines snapshots with `parent-snapshot-id` and `operation` ([Spec: 
Snapshots](https://iceberg.apache.org/spec/#snapshots)). Incremental scanning 
isn't in the spec; it's a library concept built on these primitives. It 
operates on a *range* of snapshots (from/to/inclusive), which is fundamentally 
different from a point-in-time scan on a single snapshot. Mixing both into one 
builder creates an invalid state space.
   
   The PR adds `from_snapshot_id`, `from_snapshot_inclusive`, and 
`to_snapshot_id` to `TableScanBuilder` (`crates/iceberg/src/scan/mod.rs`), 
mutually exclusive with `snapshot_id`, enforced at runtime in `build()`. It 
would be worth making illegal states unrepresentable at compile time instead.
   
   One option: a parallel entry point on `Table` 
(`crates/iceberg/src/table.rs`) with a separate builder:
   
   ```rust
   // crates/iceberg/src/table.rs
   impl Table {
       pub fn incremental_append_scan(&self) -> IncrementalAppendScanBuilder { 
... }
   }
   
   // crates/iceberg/src/scan/incremental.rs
   pub struct IncrementalAppendScanBuilder<'a> {
       table: &'a Table,
       from_snapshot_id: i64,
       from_inclusive: bool,
       to_snapshot_id: Option<i64>,  // None = current snapshot
       // shared fields: column_names, filter, concurrency limits, etc.
   }
   ```
   
   Both builders produce the same `TableScan` (which already has `plan_context: 
Option<PlanContext>`); the difference is how `PlanContext` gets configured. 
Shared builder logic (column selection, filters, concurrency) can be extracted 
into a helper. `TableScanBuilder` stays unchanged. This also leaves room for 
`IncrementalChangelogScanBuilder` later.
   
   Java went through this exact evolution. The original design (2020, 
`2562649b097a`, iceberg#315) subclassed `DataTableScan` as 
`IncrementalDataTableScan`, which inherited inapplicable methods and had to 
throw `UnsupportedOperationException` everywhere (lines 40-61). Five PRs over 
2022 redesigned it into a sibling hierarchy where `IncrementalScan<T>` and 
`TableScan` are peers under `Scan`:
   
   | Commit | PR | Change |
   |---|---|---|
   | `beed94dc2134` | iceberg#4580 | `IncrementalAppendScan` interface |
   | `7f472ebbec19` | iceberg#4744 | `BaseIncrementalAppendScan` impl |
   | `c69a3dd6171e` | iceberg#4870 | `IncrementalScan<T>` parent, 
`IncrementalChangelogScan` |
   | `40de4bc7dc12` | iceberg#5382 | `BaseIncrementalScan` extracted |
   | `80ec14ff363b` | iceberg#5577 | Deprecated `appendsBetween`/`appendsAfter` 
on `TableScan`, removal in 2.0.0 |
   
   Entry point moved from `TableScan.appendsBetween()` to 
`Table.newIncrementalAppendScan()` 
(`api/src/main/java/org/apache/iceberg/Table.java:71-84`).
   
   ---
   
   ### 2. Reuse `ancestors_between`
   
   The spec defines snapshot ancestry via `parent-snapshot-id` ([Spec: 
Snapshots](https://iceberg.apache.org/spec/#snapshots)). This codebase already 
has the traversal in `crates/iceberg/src/util/snapshot.rs:51-61`:
   
   ```rust
   pub fn ancestors_between(
       table_metadata: &TableMetadataRef,
       latest_snapshot_id: i64,
       oldest_snapshot_id: Option<i64>,
   ) -> impl Iterator<Item = SnapshotRef> + Send
   ```
   
   Returns `(oldest, latest]`, which is already exclusive-start, inclusive-end: 
exactly the default incremental range.
   
   `SnapshotRange::build()` reimplements this walk from scratch. It could use 
the existing utility instead:
   
   ```rust
   let snapshots_in_range: Vec<SnapshotRef> = if from_inclusive {
       ancestors_between(&metadata, to_id, from_parent_id).collect()
   } else {
       ancestors_between(&metadata, to_id, Some(from_id)).collect()
   };
   
   let snapshot_ids: HashSet<i64> = snapshots_in_range.iter()
       .filter(|s| s.summary().operation == Operation::Append)
       .map(|s| s.snapshot_id())
       .collect();
   ```
   
   One thing to watch: `ancestors_between` doesn't validate that 
`oldest_snapshot_id` is actually in the chain. If it's not, you get the full 
chain to root. Worth verifying ancestry explicitly (check the walk terminated 
at the expected snapshot, not root).
   
   Java equivalent: `SnapshotUtil.ancestorsBetween` 
(`core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:212-229`).
   
   ---
   
   ### 3. Skip non-APPEND snapshots, don't error
   
   The spec defines four `operation` values: `append`, `replace`, `overwrite`, 
`delete` ([Spec: Snapshot 
Summary](https://iceberg.apache.org/spec/#snapshot-summaries)). The spec 
doesn't define incremental scan semantics, so we have to decide what to do with 
non-APPEND snapshots in the range.
   
   The PR returns `ErrorKind::FeatureUnsupported` for any non-APPEND snapshot. 
This seems too strict. A table that had a compaction (`replace`) in its history 
would break incremental append scans, even though the compaction added no 
logical data.
   
   It would be better to silently skip non-APPEND snapshots. Only 
`Operation::Append` contributes new data files to an append scan:
   
   - `Replace` = compaction/rewrite, no new logical data
   - `Delete` = removes data
   - `Overwrite` = mixed add/remove, ambiguous for append-only
   
   ```rust
   let append_snapshot_ids: HashSet<i64> = ancestors_between(...)
       .filter(|s| s.summary().operation == Operation::Append)
       .map(|s| s.snapshot_id())
       .collect();
   ```
   
   Properly surfacing `Overwrite` changes would be the domain of an 
`IncrementalChangelogScan`, which could be a separate scan type down the road.
   
   Java's `BaseIncrementalAppendScan` 
(`core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java:105-117`)
 silently skips all non-APPEND snapshots. The deprecated 
`IncrementalDataTableScan` errored on OVERWRITE (lines 138-143) but that 
behavior was not carried forward.
   
   ---
   
   ### 4. Consider manifest-level filtering
   
   The spec says each manifest list entry tracks the snapshot that created it 
via `added-snapshot-id` ([Spec: Manifest 
Lists](https://iceberg.apache.org/spec/#manifest-lists)). A manifest whose 
`added-snapshot-id` is outside the scan range can't contain relevant `ADDED` 
entries. Skipping it avoids the I/O and parse cost entirely.
   
   The PR only filters at the entry level 
(`crates/iceberg/src/scan/context.rs`, 
`fetch_manifest_and_stream_manifest_entries`). Every manifest in the manifest 
list is fetched and parsed, including inherited ones that can't contain 
relevant entries. For tables with many manifests, this matters.
   
   It might be worth adding a check in 
`PlanContext::build_manifest_file_contexts` 
(`crates/iceberg/src/scan/context.rs:195-254`). `ManifestFile` already has the 
field (`crates/iceberg/src/spec/manifest_list.rs`):
   
   ```rust
   // Inside the loop over manifest_files:
   if let Some(ref snapshot_range) = self.snapshot_range {
       if manifest_file.content == ManifestContentType::Deletes {
           continue;
       }
       if !snapshot_range.contains(manifest_file.added_snapshot_id) {
           continue;
       }
   }
   ```
   
   Keep the entry-level filter too; a manifest can contain entries from 
multiple snapshots. But the manifest-level check avoids unnecessary I/O.
   
   Java applies both: `.filterManifests(m -> 
snapshotIds.contains(m.snapshotId()))` (line 70) then 
`.filterManifestEntries(...)` (line 81), plus `.ignoreDeleted()`. See 
`core/src/main/java/org/apache/iceberg/ManifestGroup.java` lines 121, 132, 
311-320, 375.
   
   ---
   
   ### 5. DataFusion: consider a scan range enum
   
   The PR threads `from_snapshot_id: Option<i64>` and `from_snapshot_inclusive: 
bool` through `IcebergTableScan::new()`, `get_batch_stream()`, 
`IcebergStaticTableProvider`, and `IcebergTableProvider` 
(`crates/integrations/datafusion/src/physical_plan/scan.rs:60-84`). This 
triggers `#[allow(clippy::too_many_arguments)]`.
   
   An enum could clean this up:
   
   ```rust
   pub(crate) enum ScanRange {
       CurrentSnapshot,
       PointInTime(i64),
       Incremental { from: i64, to: Option<i64>, from_inclusive: bool },
   }
   ```
   
   This would replace three parameters with one across all four structs, and 
would collapse `IcebergStaticTableProvider`'s three near-identical constructors 
(`try_new_incremental`, `try_new_incremental_inclusive`, 
`try_new_appends_after`) into one.
   
   ---
   
   ### 6. Tests don't verify filtering behavior
   
   **Tautological assertions.** These tests assert `result.is_ok() || 
result.is_err()`, which is always true:
   
   - `test_appends_after_convenience_method` (`crates/iceberg/src/scan/mod.rs`)
   - `test_appends_between_convenience_method`
   - `test_incremental_scan_from_snapshot_inclusive`
   
   **Swallowed errors.** DataFusion tests use `if let Ok(provider) = provider { 
... }`, passing silently when construction fails:
   
   - `test_static_provider_incremental_creates_scan` 
(`crates/integrations/datafusion/src/table/mod.rs`)
   - `test_static_provider_incremental_inclusive`
   - `test_static_provider_appends_after`
   
   **No end-to-end test.** No test checks that an incremental scan actually 
returns only files from the expected snapshot range.
   
   Something along the lines of `test_plan_files_no_deletions` 
(`crates/iceberg/src/scan/mod.rs:1278`) would work well here:
   
   1. Extend `TableTestFixture` with multiple APPEND snapshots and manifest 
entries with explicit `snapshot_id` and `ManifestStatus` values
   2. Run an incremental scan between two snapshots
   3. Collect `FileScanTask`s and assert only files from the expected snapshots 
come back
   4. Test edge cases: inclusive vs exclusive, same from/to, non-APPEND 
snapshots skipped
   
   `TableTestFixture::new_with_deep_history()` already creates 5 chained 
snapshots (S1 through S5), which could be extended with manifest files and 
operation types in the test metadata JSON.
   
   ---
   
   ### 7. Minor
   
   - **Example file** (`crates/examples/src/datafusion_incremental_read.rs`): 
author says "claude wrote it for me." Feels verbose; could probably be trimmed 
or dropped if the API doc comments cover the usage well enough.
   - **Comment style**: `/// Optional snapshot range for incremental scans` 
explains what, not why. Might want to match codebase convention.
   - **Unrelated fix**: `context.rs` changes `self.case_sensitive` to 
`case_sensitive` after destructuring. Correct but unrelated; might be better as 
a separate commit.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to