ajantha-bhat commented on code in PR #9050:
URL: https://github.com/apache/iceberg/pull/9050#discussion_r1392528464
##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java:
##########
@@ -2944,4 +2945,77 @@ private RowLevelOperationMode mode(Table table) {
String modeName = table.properties().getOrDefault(MERGE_MODE,
MERGE_MODE_DEFAULT);
return RowLevelOperationMode.fromName(modeName);
}
+
+ @Test
+ public void multipleMergeIntoWithSerializableIsolation() throws
InterruptedException {
+ // cannot run tests with concurrency for Hadoop tables without atomic
renames
+ Assumptions.assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop");
+ // if caching is off, the table is eagerly refreshed during runtime
filtering
+ // this can cause a validation exception as concurrent changes would be
visible
+ Assumptions.assumeThat(cachingCatalogEnabled()).isTrue();
+
+ createAndInitTable("id INT, class INT, value INT");
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+ tableName, MERGE_ISOLATION_LEVEL, "serializable");
+
+ sql("INSERT INTO TABLE %s VALUES (1001, 1, 10)", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1002, 1, 20)", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1003, 2, 100)", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1004, 2, 200)", tableName);
+ createBranchIfNeeded();
+
+ Thread t1 =
+ new Thread(
+ () ->
+ sql(
+ "MERGE INTO %s t USING (\n"
+ + " SELECT 1005 as id, 2 as class, SUM(value) as
value FROM %s WHERE class = 1\n"
+ + ") u \n"
+ + "ON t.id = u.id\n"
+ + "WHEN MATCHED THEN UPDATE SET t.value = u.value\n"
+ + "WHEN NOT MATCHED THEN INSERT (id, class, value)
VALUES (u.id, u.class, u.value);",
+ commitTarget(), commitTarget()));
+
+ Thread t2 =
+ new Thread(
+ () ->
+ sql(
+ "MERGE INTO %s t USING (\n"
Review Comment:
> Also I didn't see the isolation level usage for `merge-on-read`. I need to
dig deeper on that.
ok. I found the code for `merge-on-read` here (only position deletes since
spark doesn't support equality delete writing)
https://github.com/apache/iceberg/blob/188847428309701c2ec0debf9e94da94b39058b6/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java#L223-L225
we can have the testcases for that too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]