sungwy commented on code in PR #1772: URL: https://github.com/apache/iceberg-python/pull/1772#discussion_r2049778093
########## tests/integration/test_add_files.py: ########## @@ -850,3 +850,70 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True) assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_delete_delete( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) + tbl2 = session_catalog.load_table(identifier) + + tbl1.delete("string == 'z'") + + with pytest.raises( + CommitFailedException, match="Operation .* is not allowed when performing .*. Check for overlaps or conflicts." + ): + # tbl2 isn't aware of the commit by tbl1 + tbl2.delete("string == 'z'") + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_delete_append( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) + tbl2 = session_catalog.load_table(identifier) + + # This is allowed + tbl1.delete("string == 'z'") + tbl2.append(arrow_table_with_null) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_append_delete( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) + tbl2 = session_catalog.load_table(identifier) + + tbl1.append(arrow_table_with_null) + + with pytest.raises( + CommitFailedException, match="Operation .* is not allowed when performing .*. Check for overlaps or conflicts." + ): + # tbl2 isn't aware of the commit by tbl1 + tbl2.delete("string == 'z'") + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_append_append( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) + tbl2 = session_catalog.load_table(identifier) + + tbl1.append(arrow_table_with_null) + tbl2.append(arrow_table_with_null) Review Comment: could we introduce an assertion here to verify the content of the table is as we'd expect? (with 3*arrow_table_with_null data) ########## pyiceberg/table/update/snapshot.py: ########## @@ -279,6 +296,30 @@ def _commit(self) -> UpdatesAndRequirements: (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), ) + def _validate(self, starting_snapshot: Snapshot, current_snapshot: Snapshot) -> None: + # Define allowed operations for each type of operation + allowed_operations = { + Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE}, + Operation.REPLACE: {Operation.APPEND}, Review Comment: ```suggestion Operation.REPLACE: {}, ``` I think the spec may need a re-review because I think it's inaccurate to say that we only need to verify that the files we are trying to delete are still available when we are executing a `REPLACE` or `DELETE` operation. In Spark, we also validate whether there's been a conflicting appends when we use `SERIALIZABLE` isolation level: https://github.com/apache/iceberg/blob/9fc49e187069c7ec2493ac0abf20f73175b3df89/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L368-L374 I think it would be helpful to introduce all three types of isolation levels `NONE`, `SERIALIZABLE` and `SNAPSHOT`, and verify if conflicting appends or deletes have been introduced in the underlying partitions to be aligned with the implementation in Spark ########## tests/integration/test_add_files.py: ########## @@ -850,3 +850,70 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True) assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_delete_delete( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) + tbl2 = session_catalog.load_table(identifier) + + tbl1.delete("string == 'z'") + + with pytest.raises( + CommitFailedException, match="Operation .* is not allowed when performing .*. Check for overlaps or conflicts." + ): + # tbl2 isn't aware of the commit by tbl1 + tbl2.delete("string == 'z'") + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_delete_append( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) + tbl2 = session_catalog.load_table(identifier) + + # This is allowed + tbl1.delete("string == 'z'") + tbl2.append(arrow_table_with_null) Review Comment: We should verify the content of the table here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org