vinjai commented on code in PR #2371:
URL: https://github.com/apache/iceberg-python/pull/2371#discussion_r2367374634


##########
tests/integration/test_catalog.py:
##########
@@ -218,6 +224,146 @@ def test_table_exists(test_catalog: Catalog, 
table_schema_nested: Schema, databa
     assert test_catalog.table_exists((database_name, table_name)) is True
 
 
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, 
table_name: str, database_name: str) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, test_schema)
+    assert test_catalog.table_exists(identifier)
+
+    expected_schema = Schema(
+        NestedField(1, "VendorID", IntegerType(), False),
+        NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
+        NestedField(3, "new_col", IntegerType(), False),
+    )
+
+    expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), 
"new_col"))
+
+    with table.transaction() as transaction:
+        with transaction.update_schema() as update_schema:
+            update_schema.add_column("new_col", IntegerType())
+
+        with transaction.update_spec() as update_spec:
+            update_spec.add_field("new_col", IdentityTransform())
+
+    table = test_catalog.load_table(identifier)
+    assert table.schema().as_struct() == expected_schema.as_struct()
+    assert table.spec().fields == expected_spec.fields
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, 
table_name: str, database_name: str) -> None:
+    if isinstance(test_catalog, HiveCatalog):
+        pytest.skip("HiveCatalog fails in this test, need to investigate")
+
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, test_schema)
+    assert test_catalog.table_exists(identifier)
+
+    original_update = table.update_schema().add_column("new_col", LongType())
+
+    # Update schema concurrently so that the original update fails
+    concurrent_update = 
test_catalog.load_table(identifier).update_schema().delete_column("VendorID")
+    concurrent_update.commit()
+
+    expected_schema = Schema(NestedField(2, "tpep_pickup_datetime", 
TimestampType(), False))
+
+    with pytest.raises(CommitFailedException):
+        original_update.commit()
+
+    table = test_catalog.load_table(identifier)
+    assert table.schema().as_struct() == expected_schema.as_struct()
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_create_table_transaction_simple(test_catalog: Catalog, test_schema: 
Schema, table_name: str, database_name: str) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table_transaction = test_catalog.create_table_transaction(identifier, 
test_schema)
+    assert not test_catalog.table_exists(identifier)

Review Comment:
   nitpick: This assertion is already implied and is not required as we are 
already checking for table existence below.



##########
tests/integration/test_catalog.py:
##########
@@ -218,6 +224,146 @@ def test_table_exists(test_catalog: Catalog, 
table_schema_nested: Schema, databa
     assert test_catalog.table_exists((database_name, table_name)) is True
 
 
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, 
table_name: str, database_name: str) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, test_schema)
+    assert test_catalog.table_exists(identifier)
+
+    expected_schema = Schema(
+        NestedField(1, "VendorID", IntegerType(), False),
+        NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
+        NestedField(3, "new_col", IntegerType(), False),
+    )
+
+    expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), 
"new_col"))
+
+    with table.transaction() as transaction:
+        with transaction.update_schema() as update_schema:

Review Comment:
   nitpick: 
   Instead of using nested with's, code can be simplified to:
   
   `transaction.update_schema().add_column("new_col", IntegerType()).commit()`
   



##########
tests/integration/test_catalog.py:
##########
@@ -218,6 +224,146 @@ def test_table_exists(test_catalog: Catalog, 
table_schema_nested: Schema, databa
     assert test_catalog.table_exists((database_name, table_name)) is True
 
 
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, 
table_name: str, database_name: str) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, test_schema)
+    assert test_catalog.table_exists(identifier)
+
+    expected_schema = Schema(
+        NestedField(1, "VendorID", IntegerType(), False),
+        NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
+        NestedField(3, "new_col", IntegerType(), False),
+    )
+
+    expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), 
"new_col"))
+
+    with table.transaction() as transaction:
+        with transaction.update_schema() as update_schema:
+            update_schema.add_column("new_col", IntegerType())
+
+        with transaction.update_spec() as update_spec:
+            update_spec.add_field("new_col", IdentityTransform())
+
+    table = test_catalog.load_table(identifier)
+    assert table.schema().as_struct() == expected_schema.as_struct()
+    assert table.spec().fields == expected_spec.fields
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, 
table_name: str, database_name: str) -> None:
+    if isinstance(test_catalog, HiveCatalog):
+        pytest.skip("HiveCatalog fails in this test, need to investigate")
+
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, test_schema)
+    assert test_catalog.table_exists(identifier)
+
+    original_update = table.update_schema().add_column("new_col", LongType())
+
+    # Update schema concurrently so that the original update fails
+    concurrent_update = 
test_catalog.load_table(identifier).update_schema().delete_column("VendorID")
+    concurrent_update.commit()
+
+    expected_schema = Schema(NestedField(2, "tpep_pickup_datetime", 
TimestampType(), False))
+
+    with pytest.raises(CommitFailedException):
+        original_update.commit()
+
+    table = test_catalog.load_table(identifier)
+    assert table.schema().as_struct() == expected_schema.as_struct()
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_create_table_transaction_simple(test_catalog: Catalog, test_schema: 
Schema, table_name: str, database_name: str) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table_transaction = test_catalog.create_table_transaction(identifier, 
test_schema)
+    assert not test_catalog.table_exists(identifier)
+
+    table_transaction.update_schema().add_column("new_col", 
IntegerType()).commit()
+    assert not test_catalog.table_exists(identifier)
+
+    table_transaction.commit_transaction()
+    assert test_catalog.table_exists(identifier)
+
+    table = test_catalog.load_table(identifier)
+    assert table.schema().find_type("new_col").is_primitive
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_create_table_transaction_multiple_schemas(
+    test_catalog: Catalog, test_schema: Schema, test_partition_spec: 
PartitionSpec, table_name: str, database_name: str
+) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table_transaction = test_catalog.create_table_transaction(
+        identifier=identifier,
+        schema=test_schema,
+        partition_spec=test_partition_spec,
+        sort_order=SortOrder(SortField(source_id=1)),
+    )
+    assert not test_catalog.table_exists(identifier)

Review Comment:
   nitpick: Same as above



##########
tests/integration/test_catalog.py:
##########
@@ -218,6 +224,146 @@ def test_table_exists(test_catalog: Catalog, 
table_schema_nested: Schema, databa
     assert test_catalog.table_exists((database_name, table_name)) is True
 
 
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, 
table_name: str, database_name: str) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, test_schema)
+    assert test_catalog.table_exists(identifier)
+
+    expected_schema = Schema(
+        NestedField(1, "VendorID", IntegerType(), False),
+        NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
+        NestedField(3, "new_col", IntegerType(), False),
+    )
+
+    expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), 
"new_col"))
+
+    with table.transaction() as transaction:
+        with transaction.update_schema() as update_schema:
+            update_schema.add_column("new_col", IntegerType())
+
+        with transaction.update_spec() as update_spec:

Review Comment:
   nitpick:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to