Thijsvandepoll opened a new issue, #7656: URL: https://github.com/apache/iceberg/issues/7656
### Apache Iceberg version 1.2.1 (latest release) ### Query engine Spark ### Please describe the bug 🐞 Hi, I have encountered a major bug with `MERGE INTO` in Spark when using a python UDF. The problem is that when the column that is used in the `ON` clause of `MERGE INTO` has been affected by a UDF, the merge throws an error: ``` java.util.concurrent.ExecutionException: org.apache.spark.SparkUnsupportedOperationException: Cannot generate code for expression: ... ``` I have created an example to showcase the issue: ``` import os import pyspark.sql.functions as fn import pyspark.sql.types as tp from pyspark.sql import SparkSession deps = [ "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1", "org.apache.iceberg:iceberg-aws:1.2.1", "software.amazon.awssdk:bundle:2.17.257", "software.amazon.awssdk:url-connection-client:2.17.257", ] os.environ["PYSPARK_SUBMIT_ARGS"] = f"--packages {','.join(deps)} pyspark-shell" os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin" os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin" os.environ["AWS_REGION"] = "eu-east-1" catalog = "hive" spark = ( SparkSession.builder.appName("Iceberg Reader") .config( "spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", ) .config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") .config(f"spark.sql.catalog.{catalog}.type", "hive") .config(f"spark.sql.catalog.{catalog}.uri", "thrift://localhost:9083") .config( f"spark.sql.catalog.{catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO" ) .config(f"spark.sql.catalog.{catalog}.s3.endpoint", "http://localhost:9000") .config(f"spark.sql.catalog.{catalog}.warehouse", "s3a://lakehouse") .config("spark.sql.defaultCatalog", catalog) .config("hive.metastore.uris", "thrift://localhost:9083") .enableHiveSupport() .getOrCreate() ) @fn.udf(returnType=tp.IntegerType()) def return_self(inpt): return inpt namespace = "ns" table = "tab1" # Create namespaces and table spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {namespace};") spark.sql(f"CREATE TABLE IF NOT EXISTS {namespace}.{table} (col1 INT, col2 INT);") # Create some data df = spark.createDataFrame( [(1, 2), (2, 3), (3, 4)], schema=tp.StructType( [ tp.StructField("col1", tp.IntegerType()), tp.StructField("col2", tp.IntegerType()), ] ), ) # This does work, because col1 is NOT affected! df = df.withColumn("col2", return_self("col1")) df.createOrReplaceTempView("tmp") spark.sql( f""" MERGE INTO {namespace}.{table} A USING (SELECT * FROM tmp) B ON A.col1 = B.col1 WHEN MATCHED THEN UPDATE SET A.col1 = B.col1, A.col2 = B.col2 WHEN NOT MATCHED THEN INSERT * """ ) # This does NOT work! df = df.withColumn("col1", return_self("col1")) df.createOrReplaceTempView("tmp") spark.sql( f""" MERGE INTO {namespace}.{table} A USING (SELECT * FROM tmp) B ON A.col1 = B.col1 WHEN MATCHED THEN UPDATE SET A.col1 = B.col1, A.col2 = B.col2 WHEN NOT MATCHED THEN INSERT * """ ) ``` 1. I created a very simple UDF that does not do any transformations and just returns the same data as the input. 2. When we transform `col2`, and perform `MERGE INTO` on `col1`, nothing happens and the data will be inserted normally. 3. When we "transform" `col1`, by passing it through the UDF, the `MERGE INTO` fails with the error: ``` Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkUnsupportedOperationException: Cannot generate code for expression: return_self(input[0, int, true]) ``` Somehow it cannot deal with these UDFs. Persisting the data, breaking the computation chain is a workaround: ``` # This is a workaround df = df.withColumn("col1", return_self("col1")) df.cache() df.createOrReplaceTempView("tmp") spark.sql( f""" MERGE INTO {namespace}.{table} A USING (SELECT * FROM tmp) B ON A.col1 = B.col1 WHEN MATCHED THEN UPDATE SET A.col1 = B.col1, A.col2 = B.col2 WHEN NOT MATCHED THEN INSERT * """ ) ``` The workaround is annoying though. Especially for very large data. Probably you need to write the data somewhere then. Does someone have a better idea of what is going on here? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org