smaheshwar-pltr commented on code in PR #2031: URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2102786896
########## tests/integration/test_reads.py: ########## @@ -1003,3 +1003,205 @@ def test_scan_with_datetime(catalog: Catalog) -> None: df = table.scan(row_filter=LessThan("datetime", yesterday)).to_pandas() assert len(df) == 0 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_append_only(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + # Only "append"-operation snapshots occurred in this range + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + assert len(list(scan.plan_files())) == 2 + + # Check various read methods + assert len(scan.to_arrow()) == 3 + assert len(scan.to_arrow_batch_reader().read_all()) == 3 + assert len(scan.to_pandas()) == 3 + assert len(scan.to_polars()) == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_ignores_non_append_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = test_table.incremental_append_scan( + from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id, + # This is a "delete"-operation snapshot, that should be ignored by the append scan + to_snapshot_id_inclusive=test_table.snapshots()[3].snapshot_id, + ) + + assert len(list(scan.plan_files())) == 2 + assert len(scan.to_arrow()) == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_uses_current_schema(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # The schema within the snapshot range above included an extra date field, but the table was then replaced, + # removing it. An append scan always uses the current schema of the table. + expected_schema = pa.schema( + [ + pa.field("number", pa.int32()), + pa.field("letter", pa.string()), + ] + ) + + result_table = scan.to_arrow() + assert result_table.schema.equals(expected_schema) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_row_filter(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(row_filter=EqualTo("letter", "b")) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # This filter should match against the only row added in snapshots[1] and one of the two rows added in snapshots[2] + assert len(list(scan.plan_files())) == 2 + assert len(scan.to_arrow()) == 2 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_selected_fields(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(selected_fields=("number",)) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + expected_schema = pa.schema( + [ + pa.field("number", pa.int32()), + ] + ) + + result_table = scan.to_arrow() + assert result_table.schema.equals(expected_schema) + assert sorted(result_table["number"].to_pylist()) == [2, 3, 4] + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_limit(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(limit=2) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # Although three rows were added in the range, the limit of 2 should be applied + assert len(scan.to_arrow()) == 2 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_count(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + assert scan.count() == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")]) +def test_incremental_append_scan_to_snapshot_defaults_to_current(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + assert ( + len(test_table.incremental_append_scan().from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id).to_arrow()) == 3 + ) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_equal_from_and_to_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + snapshot_id = test_table.snapshots()[0].snapshot_id + + # Exclusive-inclusive semantics mean an empty table should be returned if equal from and to snapshots are specified + assert ( + len( + test_table.incremental_append_scan() + .from_snapshot_exclusive(snapshot_id) + .to_snapshot_inclusive(snapshot_id) + .to_arrow() + ) + == 0 + ) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) Review Comment: (Using just the REST catalog that has the replace here to get a different schema) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org