ajantha-bhat commented on code in PR #9050:
URL: https://github.com/apache/iceberg/pull/9050#discussion_r1392528464


##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java:
##########
@@ -2944,4 +2945,77 @@ private RowLevelOperationMode mode(Table table) {
     String modeName = table.properties().getOrDefault(MERGE_MODE, 
MERGE_MODE_DEFAULT);
     return RowLevelOperationMode.fromName(modeName);
   }
+
+  @Test
+  public void multipleMergeIntoWithSerializableIsolation() throws 
InterruptedException {
+    // cannot run tests with concurrency for Hadoop tables without atomic 
renames
+    Assumptions.assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop");
+    // if caching is off, the table is eagerly refreshed during runtime 
filtering
+    // this can cause a validation exception as concurrent changes would be 
visible
+    Assumptions.assumeThat(cachingCatalogEnabled()).isTrue();
+
+    createAndInitTable("id INT, class INT, value INT");
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+        tableName, MERGE_ISOLATION_LEVEL, "serializable");
+
+    sql("INSERT INTO TABLE %s VALUES (1001, 1, 10)", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1002, 1, 20)", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1003, 2, 100)", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1004, 2, 200)", tableName);
+    createBranchIfNeeded();
+
+    Thread t1 =
+        new Thread(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING (\n"
+                        + "  SELECT 1005 as id, 2 as class, SUM(value)  as 
value FROM %s WHERE class = 1\n"
+                        + ") u \n"
+                        + "ON t.id = u.id\n"
+                        + "WHEN MATCHED THEN UPDATE SET t.value = u.value\n"
+                        + "WHEN NOT MATCHED THEN INSERT (id, class, value) 
VALUES (u.id, u.class, u.value);",
+                    commitTarget(), commitTarget()));
+
+    Thread t2 =
+        new Thread(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING (\n"

Review Comment:
   > Also I didn't see the isolation level usage for `merge-on-read`. I need to 
dig deeper on that. 
   
   ok. I found the code for `merge-on-read` here (only position deletes since 
spark doesn't support equality delete writing)
   
https://github.com/apache/iceberg/blob/188847428309701c2ec0debf9e94da94b39058b6/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java#L223-L225
   
   we can have the testcases for that too. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to