Thijsvandepoll opened a new issue, #7656:
URL: https://github.com/apache/iceberg/issues/7656

   ### Apache Iceberg version
   
   1.2.1 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   Hi, I have encountered a major bug with `MERGE INTO` in Spark when using a 
python UDF. The problem is that when the column that is used in the `ON` clause 
of `MERGE INTO` has been affected by a UDF, the merge throws an error:
   ```
   java.util.concurrent.ExecutionException: 
org.apache.spark.SparkUnsupportedOperationException: Cannot generate code for 
expression: ...
   ```
   
   I have created an example to showcase the issue:
   ```
   import os
   
   import pyspark.sql.functions as fn
   import pyspark.sql.types as tp
   from pyspark.sql import SparkSession
   
   deps = [
       "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
       "org.apache.iceberg:iceberg-aws:1.2.1",
       "software.amazon.awssdk:bundle:2.17.257",
       "software.amazon.awssdk:url-connection-client:2.17.257",
   ]
   os.environ["PYSPARK_SUBMIT_ARGS"] = f"--packages {','.join(deps)} 
pyspark-shell"
   os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
   os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"
   os.environ["AWS_REGION"] = "eu-east-1"
   
   catalog = "hive"
   spark = (
       SparkSession.builder.appName("Iceberg Reader")
       .config(
           "spark.sql.extensions",
           "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
       )
       .config(f"spark.sql.catalog.{catalog}", 
"org.apache.iceberg.spark.SparkCatalog")
       .config(f"spark.sql.catalog.{catalog}.type", "hive")
       .config(f"spark.sql.catalog.{catalog}.uri", "thrift://localhost:9083")
       .config(
           f"spark.sql.catalog.{catalog}.io-impl", 
"org.apache.iceberg.aws.s3.S3FileIO"
       )
       .config(f"spark.sql.catalog.{catalog}.s3.endpoint", 
"http://localhost:9000";)
       .config(f"spark.sql.catalog.{catalog}.warehouse", "s3a://lakehouse")
       .config("spark.sql.defaultCatalog", catalog)
       .config("hive.metastore.uris", "thrift://localhost:9083")
       .enableHiveSupport()
       .getOrCreate()
   )
   
   
   @fn.udf(returnType=tp.IntegerType())
   def return_self(inpt):
       return inpt
   
   
   namespace = "ns"
   table = "tab1"
   
   # Create namespaces and table
   spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {namespace};")
   spark.sql(f"CREATE TABLE IF NOT EXISTS {namespace}.{table} (col1 INT, col2 
INT);")
   
   # Create some data
   df = spark.createDataFrame(
       [(1, 2), (2, 3), (3, 4)],
       schema=tp.StructType(
           [
               tp.StructField("col1", tp.IntegerType()),
               tp.StructField("col2", tp.IntegerType()),
           ]
       ),
   )
   
   # This does work, because col1 is NOT affected!
   df = df.withColumn("col2", return_self("col1"))
   df.createOrReplaceTempView("tmp")
   spark.sql(
       f"""
   MERGE INTO {namespace}.{table} A USING (SELECT * FROM tmp) B
   ON A.col1 = B.col1
   WHEN MATCHED THEN UPDATE SET A.col1 = B.col1, A.col2 = B.col2
   WHEN NOT MATCHED THEN INSERT *
   """
   )
   
   # This does NOT work!
   df = df.withColumn("col1", return_self("col1"))
   df.createOrReplaceTempView("tmp")
   spark.sql(
       f"""
   MERGE INTO {namespace}.{table} A USING (SELECT * FROM tmp) B
   ON A.col1 = B.col1
   WHEN MATCHED THEN UPDATE SET A.col1 = B.col1, A.col2 = B.col2
   WHEN NOT MATCHED THEN INSERT *
   """
   )
   ```
   
   1. I created a very simple UDF that does not do any transformations and just 
returns the same data as the input.
   2. When we transform `col2`, and perform `MERGE INTO` on `col1`, nothing 
happens and the data will be inserted normally.
   3. When we "transform" `col1`, by passing it through the UDF, the `MERGE 
INTO` fails with the error:
   ```
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.spark.SparkUnsupportedOperationException: Cannot generate code for 
expression: return_self(input[0, int, true])
   ```
   Somehow it cannot deal with these UDFs.
   
   Persisting the data, breaking the computation chain is a workaround:
   ```
   # This is a workaround
   df = df.withColumn("col1", return_self("col1"))
   df.cache()
   df.createOrReplaceTempView("tmp")
   spark.sql(
       f"""
   MERGE INTO {namespace}.{table} A USING (SELECT * FROM tmp) B
   ON A.col1 = B.col1
   WHEN MATCHED THEN UPDATE SET A.col1 = B.col1, A.col2 = B.col2
   WHEN NOT MATCHED THEN INSERT *
   """
   )
   ```
   
   The workaround is annoying though. Especially for very large data. Probably 
you need to write the data somewhere then.
   
   Does someone have a better idea of what is going on here? Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to