CharanHS30 opened a new issue, #10882: URL: https://github.com/apache/iceberg/issues/10882
### Query engine import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import java.io.File import scala.reflect.io.Directory class SparkIcebergTest extends AnyFlatSpec with Matchers with BeforeAndAfter with BeforeAndAfterAll with Logging { val warehouseDir: String = "src/test/resources/warehouse" lazy val conf: SparkConf = new SparkConf() // this needs to match the version of iceberg used in Dependencies.scala conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0") conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") conf.set("spark.sql.catalog.local.type", "hadoop") conf.set("spark.sql.catalog.local.warehouse", warehouseDir) lazy val spark: SparkSession = SparkSession .builder() .appName("iceberg-merge-into-test") .master("local[*]") .config(conf) .enableHiveSupport() .getOrCreate() def cleanWarehouse(): Unit = { if (new Directory(new File(warehouseDir)).deleteRecursively()) { log.debug("deleted hadoop warehouse dir") } else { log.debug("nothing to delete") } } override def afterAll(): Unit = { spark.stop() cleanWarehouse() () } ("instantiating a spark session with iceberg support" should "allow to create an iceberg table") in { spark.sql("CREATE TABLE local.db.table (id int, data string) USING iceberg").show() spark.sql("show tables in local.db").count() should be(1) } (it should "allow to insert data into an iceberg table") in { spark.sql("INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b')") val df: DataFrame = spark.sql("SELECT * FROM local.db.table") df.count() should be(2) } (it should "allow to insert more data into a table") in { spark.sql("INSERT INTO local.db.table VALUES (3, 'a'), (4, 'b')") val df: DataFrame = spark.sql("SELECT * FROM local.db.table") df.count() should be(4) } (it should "allow to read data from table with spark read format iceberg") in { spark.read.format("iceberg").load("local.db.table").createOrReplaceTempView("source") val df: DataFrame = spark.sql("SELECT * from source") df.count() should be(4) } (it should "allow to merge records into a table without duplicates") in { spark.sql("CREATE TABLE local.db.table2 (id int, data string) USING iceberg").show() spark.sql("INSERT INTO local.db.table2 VALUES (3, 'a'), (5, 'c')") // (3, 'a') would be a duplicate --> should not be written spark.sql( """MERGE INTO local.db.table2 t |USING (SELECT * FROM source) t2 |ON t.id = t2.id |WHEN NOT MATCHED THEN INSERT *""" .stripMargin) val df: DataFrame = spark.sql("SELECT * FROM local.db.table2") df.count() should be(5) } } ### Question Spark version: 3.3.2 Iceberg version: 1.3.0 Q1. Do you have spark.sql.extensions? Ans: Yes. Q2. Do you have the iceberg-spark-runtime package? Ans: Yes. I am still getting the error "MERGE INTO TABLE is not supported temporarily." How can I fix this issue? Kindly help me here. Hi @ru, I have seen that you have provided many solutions for this, but none of them have worked for my use case. Kindly help me here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org