jqin61 commented on code in PR #931:
URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1764223333


##########
tests/integration/test_writes/test_partitioned_writes.py:
##########
@@ -221,6 +276,98 @@ def test_query_filter_v1_v2_append_null(
         assert df.where(f"{col} is not null").count() == 4, f"Expected 4 
non-null rows for {col}"
         assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows 
for {col}"
 
+@pytest.mark.integration
+@pytest.mark.parametrize(
+    "spec",
+    [
+        (PartitionSpec(PartitionField(source_id=4, field_id=1001, 
transform=BucketTransform(2), name="int_bucket"))),
+        (PartitionSpec(PartitionField(source_id=5, field_id=1001, 
transform=BucketTransform(2), name="long_bucket"))),
+        (PartitionSpec(PartitionField(source_id=10, field_id=1001, 
transform=BucketTransform(2), name="date_bucket"))),
+        (PartitionSpec(PartitionField(source_id=8, field_id=1001, 
transform=BucketTransform(2), name="timestamp_bucket"))),
+        (PartitionSpec(PartitionField(source_id=9, field_id=1001, 
transform=BucketTransform(2), name="timestamptz_bucket"))),
+        (PartitionSpec(PartitionField(source_id=2, field_id=1001, 
transform=BucketTransform(2), name="string_bucket"))),
+        (PartitionSpec(PartitionField(source_id=12, field_id=1001, 
transform=BucketTransform(2), name="fixed_bucket"))),
+        (PartitionSpec(PartitionField(source_id=11, field_id=1001, 
transform=BucketTransform(2), name="binary_bucket"))),
+        (PartitionSpec(PartitionField(source_id=4, field_id=1001, 
transform=TruncateTransform(2), name="int_trunc"))),
+        (PartitionSpec(PartitionField(source_id=5, field_id=1001, 
transform=TruncateTransform(2), name="long_trunc"))),
+        (PartitionSpec(PartitionField(source_id=2, field_id=1001, 
transform=TruncateTransform(2), name="string_trunc"))),
+        (PartitionSpec(PartitionField(source_id=11, field_id=1001, 
transform=TruncateTransform(2), name="binary_trunc"))),
+        (PartitionSpec(PartitionField(source_id=8, field_id=1001, 
transform=YearTransform(), name="timestamp_year"))),
+        (PartitionSpec(PartitionField(source_id=9, field_id=1001, 
transform=YearTransform(), name="timestamptz_year"))),
+        (PartitionSpec(PartitionField(source_id=10, field_id=1001, 
transform=YearTransform(), name="date_year"))),
+        (PartitionSpec(PartitionField(source_id=8, field_id=1001, 
transform=MonthTransform(), name="timestamp_month"))),
+        (PartitionSpec(PartitionField(source_id=9, field_id=1001, 
transform=MonthTransform(), name="timestamptz_month"))),
+        (PartitionSpec(PartitionField(source_id=10, field_id=1001, 
transform=MonthTransform(), name="date_month"))),
+        (PartitionSpec(PartitionField(source_id=8, field_id=1001, 
transform=DayTransform(), name="timestamp_day"))),
+        (PartitionSpec(PartitionField(source_id=9, field_id=1001, 
transform=DayTransform(), name="timestamptz_day"))),
+        (PartitionSpec(PartitionField(source_id=10, field_id=1001, 
transform=DayTransform(), name="date_day"))),
+        (PartitionSpec(PartitionField(source_id=8, field_id=1001, 
transform=HourTransform(), name="timestamp_hour"))),
+        (PartitionSpec(PartitionField(source_id=9, field_id=1001, 
transform=HourTransform(), name="timestamptz_hour"))),
+        (PartitionSpec(PartitionField(source_id=10, field_id=1001, 
transform=HourTransform(), name="date_hour"))),
+    ],
+)
+def test_dynamic_overwrite_non_identity_transform(
+    session_catalog: Catalog, arrow_table_with_null: pa.Table, spec: 
PartitionSpec
+) -> None:
+    identifier = "default.dynamic_overwrite_non_identity_transform"
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(
+        identifier=identifier,
+        schema=TABLE_SCHEMA,
+        properties={"format-version": "2"},
+        partition_spec=spec,
+    )
+    with pytest.raises(
+        ValueError,
+        match="For now dynamic overwrite does not support a table with 
non-identity-transform field in the latest partition spec: *",
+    ):
+        tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 1))
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize(
+    "part_col",
+    [
+        "int",
+        "bool",
+        "string",
+        "string_long",
+        "long",
+        "float",
+        "double",
+        "date",
+        "timestamp",
+        "binary",
+        "timestamptz",
+    ],
+)
+@pytest.mark.parametrize(
+    "format_version",
+    [1, 2],
+)
+def test_dynamic_overwrite_unpartitioned_evolve_to_identity_transform(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, part_col: str, format_version: int
+) -> None:
+    identifier = 
f"default.unpartitioned_table_v{format_version}_evolve_into_identity_transformed_partition_field_{part_col}"
+    tbl = session_catalog.create_table(
+        identifier=identifier,
+        schema=TABLE_SCHEMA,
+        properties={"format-version": "2"},
+    )
+    tbl.append(arrow_table_with_null)
+    tbl.update_spec().add_field(part_col, IdentityTransform(), 
f"{part_col}_identity").commit()
+    tbl.append(arrow_table_with_null)
+    # each column should be [a, null, b, a, null, b]
+    # dynamic overwrite a non-null row a, resulting in [null, b, null, b, a]
+    tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 1))
+    df = spark.table(identifier)
+    assert df.where(f"{part_col} is not null").count() == 3, f"Expected 3 
non-null rows for {part_col},"
+    assert df.where(f"{part_col} is null").count() == 2, f"Expected 2 null 
rows for {part_col},"
+

Review Comment:
   I added the check.
   
   One interesting thing to notice is that a data file partitioned on a long 
string field cannot match strict metrics evaluator and will end up with an 
overwrite rather than delete (although the overwritten new file is the same). 
the reason is: for a long string, the lower bound and upper bound  is truncated 
e.g. aaaaaaaaaaaaaaaaaaaaaa has lower bound of aaaaaaaaaaaaaaaa and upper bound 
of aaaaaaaaaaaaaaab and this makes strict metric evaluator determine the file 
evaluate as ROWS_MIGHT_NOT_MATCH which further causes the partitioned data file 
to be overwriten rather than deleted
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to