RussellSpitzer commented on issue #12653: URL: https://github.com/apache/iceberg/issues/12653#issuecomment-2755353265
The issue I believe is related to the optimizations here https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L47-L75 The code above takes a MergeIntoCommand which only does "NOT MATCHED INSERT" and changes the Command into an AppendData.byPosition instead of a RowDelta like https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L129-L138 AppendData is treated in Apache Iceberg as a Batch Write which always operates under what is equivalent to "Snapshot Isolation" regardless of what has been configured for RowDeltas. This means MERGE INTO NOT MATCHED INSERT commands with no match clause are running at the wrong isolation level leading to the above issue. Repo - TestMerge.java ```java @TestTemplate public void testMergeIntoEmptyTableSerializableIsolation() throws InterruptedException { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable"); createOrReplaceView( "source", "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n" + "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n" + "{ \"id\": 3, \"dep\": \"emp-id-3\" }"); ExecutorService executorService = MoreExecutors.getExitingExecutorService( (ThreadPoolExecutor) Executors.newFixedThreadPool(2)); executorService.submit(() -> spark.sql( String.format("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED THEN INSERT *", tableName)).collect()); executorService.submit(() -> spark.sql( String.format("MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED THEN INSERT *", tableName)).collect()); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.MINUTES); ImmutableList<Object[]> expectedRows = ImmutableList.of( row(1, "emp-id-1"), // new row(2, "emp-id-2"), // new row(3, "emp-id-3") // new ); assertEquals( "Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName)); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org