sungwy commented on code in PR #1772:
URL: https://github.com/apache/iceberg-python/pull/1772#discussion_r2049778093


##########
tests/integration/test_add_files.py:
##########
@@ -850,3 +850,70 @@ def 
test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file
     with pytest.raises(ValueError) as exc_info:
         tbl.add_files(file_paths=[existing_files_in_table], 
check_duplicate_files=True)
     assert f"Cannot add files that are already referenced by table, files: 
{existing_files_in_table}" in str(exc_info.value)
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_conflict_delete_delete(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.test_conflict"
+    tbl1 = _create_table(session_catalog, identifier, format_version, 
schema=arrow_table_with_null.schema)
+    tbl1.append(arrow_table_with_null)
+    tbl2 = session_catalog.load_table(identifier)
+
+    tbl1.delete("string == 'z'")
+
+    with pytest.raises(
+        CommitFailedException, match="Operation .* is not allowed when 
performing .*. Check for overlaps or conflicts."
+    ):
+        # tbl2 isn't aware of the commit by tbl1
+        tbl2.delete("string == 'z'")
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_conflict_delete_append(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.test_conflict"
+    tbl1 = _create_table(session_catalog, identifier, format_version, 
schema=arrow_table_with_null.schema)
+    tbl1.append(arrow_table_with_null)
+    tbl2 = session_catalog.load_table(identifier)
+
+    # This is allowed
+    tbl1.delete("string == 'z'")
+    tbl2.append(arrow_table_with_null)
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_conflict_append_delete(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.test_conflict"
+    tbl1 = _create_table(session_catalog, identifier, format_version, 
schema=arrow_table_with_null.schema)
+    tbl1.append(arrow_table_with_null)
+    tbl2 = session_catalog.load_table(identifier)
+
+    tbl1.append(arrow_table_with_null)
+
+    with pytest.raises(
+        CommitFailedException, match="Operation .* is not allowed when 
performing .*. Check for overlaps or conflicts."
+    ):
+        # tbl2 isn't aware of the commit by tbl1
+        tbl2.delete("string == 'z'")
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_conflict_append_append(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.test_conflict"
+    tbl1 = _create_table(session_catalog, identifier, format_version, 
schema=arrow_table_with_null.schema)
+    tbl1.append(arrow_table_with_null)
+    tbl2 = session_catalog.load_table(identifier)
+
+    tbl1.append(arrow_table_with_null)
+    tbl2.append(arrow_table_with_null)

Review Comment:
   could we introduce an assertion here to verify the content of the table is 
as we'd expect? (with 3*arrow_table_with_null data)



##########
pyiceberg/table/update/snapshot.py:
##########
@@ -279,6 +296,30 @@ def _commit(self) -> UpdatesAndRequirements:
             
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id,
 ref="main"),),
         )
 
+    def _validate(self, starting_snapshot: Snapshot, current_snapshot: 
Snapshot) -> None:
+        # Define allowed operations for each type of operation
+        allowed_operations = {
+            Operation.APPEND: {Operation.APPEND, Operation.REPLACE, 
Operation.OVERWRITE, Operation.DELETE},
+            Operation.REPLACE: {Operation.APPEND},

Review Comment:
   ```suggestion
               Operation.REPLACE: {},
   ```
   
   I think the spec may need a re-review because I think it's inaccurate to say 
that we only need to verify that the files we are trying to delete are still 
available when we are executing a `REPLACE` or `DELETE` operation.
   
   In Spark, we also validate whether there's been a conflicting appends when 
we use `SERIALIZABLE` isolation level:
   
   
https://github.com/apache/iceberg/blob/9fc49e187069c7ec2493ac0abf20f73175b3df89/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L368-L374
   
   I think it would be helpful to introduce all three types of isolation levels 
`NONE`, `SERIALIZABLE` and `SNAPSHOT`, and verify if conflicting appends or 
deletes have been introduced in the underlying partitions to be aligned with 
the implementation in Spark



##########
tests/integration/test_add_files.py:
##########
@@ -850,3 +850,70 @@ def 
test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file
     with pytest.raises(ValueError) as exc_info:
         tbl.add_files(file_paths=[existing_files_in_table], 
check_duplicate_files=True)
     assert f"Cannot add files that are already referenced by table, files: 
{existing_files_in_table}" in str(exc_info.value)
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_conflict_delete_delete(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.test_conflict"
+    tbl1 = _create_table(session_catalog, identifier, format_version, 
schema=arrow_table_with_null.schema)
+    tbl1.append(arrow_table_with_null)
+    tbl2 = session_catalog.load_table(identifier)
+
+    tbl1.delete("string == 'z'")
+
+    with pytest.raises(
+        CommitFailedException, match="Operation .* is not allowed when 
performing .*. Check for overlaps or conflicts."
+    ):
+        # tbl2 isn't aware of the commit by tbl1
+        tbl2.delete("string == 'z'")
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_conflict_delete_append(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.test_conflict"
+    tbl1 = _create_table(session_catalog, identifier, format_version, 
schema=arrow_table_with_null.schema)
+    tbl1.append(arrow_table_with_null)
+    tbl2 = session_catalog.load_table(identifier)
+
+    # This is allowed
+    tbl1.delete("string == 'z'")
+    tbl2.append(arrow_table_with_null)

Review Comment:
   We should verify the content of the table here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to