jqin61 commented on code in PR #931: URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1764223333
########## tests/integration/test_writes/test_partitioned_writes.py: ########## @@ -221,6 +276,98 @@ def test_query_filter_v1_v2_append_null( assert df.where(f"{col} is not null").count() == 4, f"Expected 4 non-null rows for {col}" assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows for {col}" +@pytest.mark.integration +@pytest.mark.parametrize( + "spec", + [ + (PartitionSpec(PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_bucket"))), + (PartitionSpec(PartitionField(source_id=5, field_id=1001, transform=BucketTransform(2), name="long_bucket"))), + (PartitionSpec(PartitionField(source_id=10, field_id=1001, transform=BucketTransform(2), name="date_bucket"))), + (PartitionSpec(PartitionField(source_id=8, field_id=1001, transform=BucketTransform(2), name="timestamp_bucket"))), + (PartitionSpec(PartitionField(source_id=9, field_id=1001, transform=BucketTransform(2), name="timestamptz_bucket"))), + (PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=BucketTransform(2), name="string_bucket"))), + (PartitionSpec(PartitionField(source_id=12, field_id=1001, transform=BucketTransform(2), name="fixed_bucket"))), + (PartitionSpec(PartitionField(source_id=11, field_id=1001, transform=BucketTransform(2), name="binary_bucket"))), + (PartitionSpec(PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(2), name="int_trunc"))), + (PartitionSpec(PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="long_trunc"))), + (PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(2), name="string_trunc"))), + (PartitionSpec(PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(2), name="binary_trunc"))), + (PartitionSpec(PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_year"))), + (PartitionSpec(PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_year"))), + (PartitionSpec(PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_year"))), + (PartitionSpec(PartitionField(source_id=8, field_id=1001, transform=MonthTransform(), name="timestamp_month"))), + (PartitionSpec(PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_month"))), + (PartitionSpec(PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_month"))), + (PartitionSpec(PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_day"))), + (PartitionSpec(PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_day"))), + (PartitionSpec(PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_day"))), + (PartitionSpec(PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_hour"))), + (PartitionSpec(PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_hour"))), + (PartitionSpec(PartitionField(source_id=10, field_id=1001, transform=HourTransform(), name="date_hour"))), + ], +) +def test_dynamic_overwrite_non_identity_transform( + session_catalog: Catalog, arrow_table_with_null: pa.Table, spec: PartitionSpec +) -> None: + identifier = "default.dynamic_overwrite_non_identity_transform" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table( + identifier=identifier, + schema=TABLE_SCHEMA, + properties={"format-version": "2"}, + partition_spec=spec, + ) + with pytest.raises( + ValueError, + match="For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: *", + ): + tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 1)) + + +@pytest.mark.integration +@pytest.mark.parametrize( + "part_col", + [ + "int", + "bool", + "string", + "string_long", + "long", + "float", + "double", + "date", + "timestamp", + "binary", + "timestamptz", + ], +) +@pytest.mark.parametrize( + "format_version", + [1, 2], +) +def test_dynamic_overwrite_unpartitioned_evolve_to_identity_transform( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, part_col: str, format_version: int +) -> None: + identifier = f"default.unpartitioned_table_v{format_version}_evolve_into_identity_transformed_partition_field_{part_col}" + tbl = session_catalog.create_table( + identifier=identifier, + schema=TABLE_SCHEMA, + properties={"format-version": "2"}, + ) + tbl.append(arrow_table_with_null) + tbl.update_spec().add_field(part_col, IdentityTransform(), f"{part_col}_identity").commit() + tbl.append(arrow_table_with_null) + # each column should be [a, null, b, a, null, b] + # dynamic overwrite a non-null row a, resulting in [null, b, null, b, a] + tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 1)) + df = spark.table(identifier) + assert df.where(f"{part_col} is not null").count() == 3, f"Expected 3 non-null rows for {part_col}," + assert df.where(f"{part_col} is null").count() == 2, f"Expected 2 null rows for {part_col}," + Review Comment: I added the check. One interesting thing to notice is that a data file partitioned on a long string field cannot match strict metrics evaluator and will end up with an overwrite rather than delete (although the overwritten new file is the same). the reason is: for a long string, the lower bound and upper bound is truncated e.g. aaaaaaaaaaaaaaaaaaaaaa has lower bound of aaaaaaaaaaaaaaaa and upper bound of aaaaaaaaaaaaaaab and this makes strict metric evaluator determine the file evaluate as ROWS_MIGHT_NOT_MATCH which further causes the partitioned data file to be overwriten rather than deleted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org