RussellSpitzer commented on issue #12653:
URL: https://github.com/apache/iceberg/issues/12653#issuecomment-2755353265

   The issue I believe is related to the optimizations here
   
   
https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L47-L75
   
   The code above takes a MergeIntoCommand which only does "NOT MATCHED INSERT" 
and changes the Command into an AppendData.byPosition instead of a RowDelta 
like 
   
   
https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L129-L138
   
   AppendData is treated in Apache Iceberg as a Batch Write which always 
operates under what is equivalent to "Snapshot Isolation" regardless of what 
has been configured for RowDeltas. This means MERGE INTO NOT MATCHED INSERT 
commands with no match clause are running at the wrong isolation level leading 
to the above issue.
   
   
   Repo - 
   TestMerge.java
   
   ```java
     @TestTemplate
     public void testMergeIntoEmptyTableSerializableIsolation() throws 
InterruptedException {
       createAndInitTable("id INT, dep STRING");
       sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, 
MERGE_ISOLATION_LEVEL, "serializable");
   
       createOrReplaceView(
         "source",
         "id INT, dep STRING",
         "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
           + "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
           + "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
   
       ExecutorService executorService =
         MoreExecutors.getExitingExecutorService(
           (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
   
       executorService.submit(() ->
         spark.sql(
           String.format("MERGE INTO %s AS t USING source AS s ON t.id == s.id 
WHEN NOT MATCHED THEN INSERT *", tableName)).collect());
       executorService.submit(() ->
         spark.sql(
           String.format("MERGE INTO %s AS t USING source AS s ON t.id == s.id 
WHEN NOT MATCHED THEN INSERT *", tableName)).collect());
   
       executorService.shutdown();
       executorService.awaitTermination(1, TimeUnit.MINUTES);
   
       ImmutableList<Object[]> expectedRows =
         ImmutableList.of(
           row(1, "emp-id-1"), // new
           row(2, "emp-id-2"), // new
           row(3, "emp-id-3")  // new
         );
   
       assertEquals(
         "Should have expected rows",
         expectedRows,
         sql("SELECT * FROM %s ORDER BY id", tableName));
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to