cccs-jc opened a new issue, #11094:
URL: https://github.com/apache/iceberg/issues/11094

   ### Query engine
   
   - Spark 3.5
   - Iceberg 1.5.2
   - Azure storage account ABFSS
   
   ### Question
   
   I’m trying to use the `MERGE INTO` statement within a Spark streaming 
application that runs several streaming queries. These queries all write to the 
same target table.
   
   I understand that the `MERGE INTO` statement performs a copy-on-write 
operation on files affected by updated or deleted rows. To prevent multiple 
queries from modifying the same files, I have partitioned the target table by 
`query_id`.
   
   Additionally, I’ve configured the target table with `snapshot` isolation, 
which is less strict than the `serializable` isolation level.
   
   However, despite these precautions, the commit operation occasionally fails 
with the following error:
   
   ```
   java.lang.IllegalStateException: Runtime file filtering is not possible: the 
table has been concurrently modified. Row-level operation scan snapshot ID: 
6599699721474649632, current table snapshot ID: 8543071133912689275. If an 
external process modifies the table, enable table caching in the catalog. If 
multiple threads modify the table, use independent Spark sessions in each 
thread.
   ```
   
   @RussellSpitzer  explained on a Slack channel
   
   > Row level operations do 2 passes over the data, once with the join columns 
and once with the whole row. If the data changes between those passes you get 
the above error. The message suggests using a different thread/session because 
changes within the same session would change the Spark cache and cause the 
error. You would also see this if the Spark Catalog cache is disabled and 
another process modifies the table between the passes.
   
   
   My question is: How can I avoid this error when committing? Specifically, 
what does it mean to "use independent Spark sessions in each thread"?
   
   I have attached a simple pyspark Spark application illustrating the problem 
and I'm curious how this example could be modified to prevent the error. I 
would I introduce "independent Spark sessions" in this example.
   
   
   ```python
   
   
   
   from pyspark.sql import SparkSession
   import pyspark.sql.functions as F
   import time
   
   table_name = <put target table name here>
   
   # Initialize Spark Session
   spark = ( SparkSession.builder
       .appName("streaming merg into")
       .config("spark.sql.shuffle.partitions", "2")
       .config("spark.log.level", "ERROR")
       .getOrCreate()
       )
   
   spark.sparkContext.setLogLevel("ERROR")
   
   spark.sql(f"drop table if exists {table_name}")
   
   spark.sql(f"""
       create table if not exists {table_name} (
       timestamp timestamp,
       value long,
       query_id integer
       )
       using iceberg
       partitioned by (query_id)
       tblproperties (
           "commit.retry.num-retries"="25",
           "write.delete.isolation-level"= "snapshot",
           "write.update.mode"= "copy-on-write",
           "write.update.isolation-level"= "snapshot",
           "write.merge.mode"= "copy-on-write",
           "write.merge.isolation-level"= "snapshot"
       )
   """)
   
   spark.table(table_name).printSchema()
   
   
   # Function to run a streaming query with foreachBatch
   def start_streaming_query(query_id):
   
       # Define a write function using foreachBatch
       def write_to_table(df, batch_id):
           view_name = f"updates_{query_id}"
           full_view_name = "global_temp." + view_name
           spark.sql(f"uncache table if exists {full_view_name}")
           df.createOrReplaceGlobalTempView(view_name)
           spark.sql(f"cache table {full_view_name}")
           df.show(truncate=False, n=10000)
   
           # Execute the MERGE INTO statement
           merge_query = f"""
           MERGE INTO {table_name} AS t
           USING {full_view_name} AS s
           ON (t.query_id = s.query_id and t.value = s.value)
           WHEN MATCHED THEN UPDATE SET *
           WHEN NOT MATCHED THEN INSERT *
           """
   
           num_retry = 0
           while True:
               try:
                   spark.sql(merge_query)
                   break
               except Exception as e:
                   num_retry += 1
                   error_message = str(e)
                   print(f"Query {query_id} is retrying {num_retry} error was 
{error_message}")
                   if num_retry > 20:
                       raise e
   
   
   
       # Simulate a streaming DataFrame
       (
           spark
           .readStream
           .format("rate")
           .option("rowsPerSecond", 1)
           .load()
           .withColumn("query_id", F.lit(query_id))
           .writeStream
           .foreachBatch(write_to_table)
           .outputMode("append")
           .trigger(processingTime="10 seconds")
           .start()
       )
   
   num_queries = 10  # Number of concurrent streaming queries
   for i in range(num_queries):
       start_streaming_query(i)
   
   spark.streams.awaitAnyTermination()
   
   spark.stop()
   
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to