smaheshwar-pltr commented on code in PR #3364:
URL: https://github.com/apache/iceberg-python/pull/3364#discussion_r3260056879


##########
tests/integration/test_reads.py:
##########
@@ -1272,3 +1272,138 @@ def test_scan_source_field_missing_in_spec(catalog: 
Catalog, spark: SparkSession
 
     table = catalog.load_table(identifier)
     assert len(list(table.scan().plan_files())) == 3
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"), 
lf("session_catalog")])
+def test_incremental_append_scan_append_only(catalog: Catalog) -> None:
+    test_table = catalog.load_table("default.test_incremental_read")
+
+    scan = (
+        test_table.incremental_append_scan()
+        .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
+        .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id)
+    )
+
+    # snapshots[1] adds 1 file (letter=b); snapshots[2] adds 2 files 
(letter=b, letter=c).
+    assert len(list(scan.plan_files())) == 3
+    assert sorted(scan.to_arrow()["number"].to_pylist()) == [2, 3, 4]
+
+    # All read paths return the same rows.
+    assert len(scan.to_arrow_batch_reader().read_all()) == 3
+    assert len(scan.to_pandas()) == 3
+    assert len(scan.to_polars()) == 3
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"), 
lf("session_catalog")])
+def test_incremental_append_scan_ignores_non_append_snapshots(catalog: 
Catalog) -> None:
+    test_table = catalog.load_table("default.test_incremental_read")
+
+    # snapshots[3] is a delete. The append scan must ignore it.
+    scan = test_table.incremental_append_scan(
+        from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id,
+        to_snapshot_id_inclusive=test_table.snapshots()[3].snapshot_id,
+    )
+    assert len(list(scan.plan_files())) == 3
+    assert sorted(scan.to_arrow()["number"].to_pylist()) == [2, 3, 4]
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"), 
lf("session_catalog")])
+def test_incremental_append_scan_schema_evolution_within_range(catalog: 
Catalog) -> None:
+    test_table = catalog.load_table("default.test_incremental_read")
+
+    # snapshots[1..2] are on the original schema (number, letter); 
snapshots[4] is on the evolved
+    # schema (number, letter, extra) after ALTER TABLE ADD COLUMN. The scan 
must project the older
+    # rows onto the current schema (extra -> null) and pick up the new value 
for the newer row.
+    scan = (
+        test_table.incremental_append_scan()
+        .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
+        .to_snapshot_inclusive(test_table.snapshots()[4].snapshot_id)
+    )
+    assert len(list(scan.plan_files())) == 4
+
+    expected_schema = pa.schema([pa.field("number", pa.int32()), 
pa.field("letter", pa.string()), pa.field("extra", pa.int32())])
+    result_table = scan.to_arrow()
+    assert result_table.schema.equals(expected_schema)
+    rows = zip(
+        result_table["number"].to_pylist(),
+        result_table["letter"].to_pylist(),
+        result_table["extra"].to_pylist(),
+        strict=True,
+    )
+    assert sorted(rows, key=lambda r: r[0]) == [(2, "b", None), (3, "c", 
None), (4, "b", None), (5, "d", 100)]
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"), 
lf("session_catalog")])
+def test_incremental_append_scan_partition_pruning(catalog: Catalog) -> None:
+    test_table = catalog.load_table("default.test_incremental_read")
+
+    # `letter=c` only appears in snapshots[2]. The manifest evaluator rejects 
snapshots[1]'s
+    # manifest (letter=b only); the partition evaluator rejects the letter=b 
entry in
+    # snapshots[2]'s manifest. One file remains.
+    scan = (
+        test_table.incremental_append_scan(row_filter=EqualTo("letter", "c"))
+        .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
+        .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id)
+    )
+    assert len(list(scan.plan_files())) == 1
+    assert scan.to_arrow()["number"].to_pylist() == [3]
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"), 
lf("session_catalog")])
+def test_incremental_append_scan_metrics_pruning(catalog: Catalog) -> None:

Review Comment:
   Filters on a non-partition column (`number`), so the manifest and partition 
evaluators degenerate to ALWAYS_TRUE and it's the per-file metrics evaluator 
(column min/max/null stats) that must do all the pruning. Covers a layer of 
`ManifestGroupPlanner` that the existing `DataScan` integration coverage 
doesn't exercise end-to-end through a real scan.



##########
dev/provision.py:
##########
@@ -395,3 +395,37 @@
     )
     spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str 
WRITE ORDERED BY id")
     spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str 
VALUES 'a', 'c'")
+
+    # Append scan fixture. Snapshots written:
+    #   0: append (1, 'a')
+    #   1: append (2, 'b')
+    #   2: append (3, 'c'), (4, 'b')
+    #   3: delete number=2
+    #   4: append (5, 'd', 100) -- on evolved schema
+    #   5: replace table -- lineage break
+    spark.sql(
+        f"""
+            CREATE OR REPLACE TABLE 
{catalog_name}.default.test_incremental_read (

Review Comment:
   Partitioned by `letter` so the per-snapshot file layout is deterministic 
across Spark versions, which lets the integration tests assert on exact 
`plan_files()` counts. `ALTER ADD COLUMN extra` is included so the 
schema-evolution-within-range test runs on the same fixture, and the closing 
`REPLACE TABLE` is for the disconnected-snapshots test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to