ajantha-bhat commented on code in PR #9050: URL: https://github.com/apache/iceberg/pull/9050#discussion_r1392528464
########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java: ########## @@ -2944,4 +2945,77 @@ private RowLevelOperationMode mode(Table table) { String modeName = table.properties().getOrDefault(MERGE_MODE, MERGE_MODE_DEFAULT); return RowLevelOperationMode.fromName(modeName); } + + @Test + public void multipleMergeIntoWithSerializableIsolation() throws InterruptedException { + // cannot run tests with concurrency for Hadoop tables without atomic renames + Assumptions.assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop"); + // if caching is off, the table is eagerly refreshed during runtime filtering + // this can cause a validation exception as concurrent changes would be visible + Assumptions.assumeThat(cachingCatalogEnabled()).isTrue(); + + createAndInitTable("id INT, class INT, value INT"); + sql( + "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", + tableName, MERGE_ISOLATION_LEVEL, "serializable"); + + sql("INSERT INTO TABLE %s VALUES (1001, 1, 10)", tableName); + sql("INSERT INTO TABLE %s VALUES (1002, 1, 20)", tableName); + sql("INSERT INTO TABLE %s VALUES (1003, 2, 100)", tableName); + sql("INSERT INTO TABLE %s VALUES (1004, 2, 200)", tableName); + createBranchIfNeeded(); + + Thread t1 = + new Thread( + () -> + sql( + "MERGE INTO %s t USING (\n" + + " SELECT 1005 as id, 2 as class, SUM(value) as value FROM %s WHERE class = 1\n" + + ") u \n" + + "ON t.id = u.id\n" + + "WHEN MATCHED THEN UPDATE SET t.value = u.value\n" + + "WHEN NOT MATCHED THEN INSERT (id, class, value) VALUES (u.id, u.class, u.value);", + commitTarget(), commitTarget())); + + Thread t2 = + new Thread( + () -> + sql( + "MERGE INTO %s t USING (\n" Review Comment: > Also I didn't see the isolation level usage for `merge-on-read`. I need to dig deeper on that. ok. I found the code for `merge-on-read` here (only position deletes since spark doesn't support equality delete writing) https://github.com/apache/iceberg/blob/188847428309701c2ec0debf9e94da94b39058b6/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java#L223-L225 we can have the testcases for that too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org