rohitanil opened a new issue, #11843: URL: https://github.com/apache/iceberg/issues/11843
### Query engine Spark ### Question I am using a REST catalog and spark iceberg on docker to read from Kafka topic, make some transformations and write it out as iceberg tables. I am able to print the transformed data to the console but throws an error when I write to Iceberg ``` from pyspark.sql import SparkSession from pyspark.sql.functions import col, from_json, min, max, avg, window from pyspark.sql.types import StructField, StructType, DoubleType, StringType, TimestampType if __name__ == "__main__": spark = SparkSession.builder.appName("Kafka-Iceberg-Stream-Processor").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType([ StructField("BTC_EUR", DoubleType(), True), StructField("BTC_INR", DoubleType(), True), StructField("BTC_USD", DoubleType(), True), StructField("ETH_EUR", DoubleType(), True), StructField("ETH_INR", DoubleType(), True), StructField("ETH_USD", DoubleType(), True), StructField("timestamp", TimestampType(), True) ]) kafka_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "192.168.1.16:9092") \ .option("subscribe", "crypto") \ .option("startingOffsets", "earliest") \ .load() parsed_df = kafka_df.selectExpr( "CAST(value AS STRING) AS value", "timestamp" ).withColumn("parsed_json", from_json(col("value"), schema)) \ .select( col("parsed_json.BTC_EUR").alias("BTC_EUR"), col("parsed_json.BTC_INR").alias("BTC_INR"), col("parsed_json.BTC_USD").alias("BTC_USD"), col("parsed_json.ETH_EUR").alias("ETH_EUR"), col("parsed_json.ETH_INR").alias("ETH_INR"), col("parsed_json.ETH_USD").alias("ETH_USD"), col("parsed_json.timestamp").alias("event_time"), col("timestamp").alias("processing_time") ) windowed_df = parsed_df \ .withWatermark("event_time", "1 minute") \ .groupBy(window(col("event_time"), "5 minutes", "5 minutes")) \ .agg( max("BTC_EUR").alias("max_BTC_EUR"), min("BTC_EUR").alias("min_BTC_EUR"), avg("BTC_EUR").alias("avg_BTC_EUR"), max("BTC_INR").alias("max_BTC_INR"), min("BTC_INR").alias("min_BTC_INR"), avg("BTC_INR").alias("avg_BTC_INR"), max("BTC_USD").alias("max_BTC_USD"), min("BTC_USD").alias("min_BTC_USD"), avg("BTC_USD").alias("avg_BTC_USD"), max("ETH_EUR").alias("max_ETH_EUR"), min("ETH_EUR").alias("min_ETH_EUR"), avg("ETH_EUR").alias("avg_ETH_EUR"), max("ETH_INR").alias("max_ETH_INR"), min("ETH_INR").alias("min_ETH_INR"), avg("ETH_INR").alias("avg_ETH_INR"), max("ETH_USD").alias("max_ETH_USD"), min("ETH_USD").alias("min_ETH_USD"), avg("ETH_USD").alias("avg_ETH_USD") ).withColumn("window_start", col("window.start")) \ .withColumn("window_end", col("window.end")) \ .drop("window") # Dropping the original window column # Create the table if it doesn't exist spark.sql(""" CREATE TABLE IF NOT EXISTS rest.db.crypto_metrics ( window_start TIMESTAMP, window_end TIMESTAMP, max_BTC_EUR DOUBLE, min_BTC_EUR DOUBLE, avg_BTC_EUR DOUBLE, max_BTC_INR DOUBLE, min_BTC_INR DOUBLE, avg_BTC_INR DOUBLE, max_BTC_USD DOUBLE, min_BTC_USD DOUBLE, avg_BTC_USD DOUBLE, max_ETH_EUR DOUBLE, min_ETH_EUR DOUBLE, avg_ETH_EUR DOUBLE, max_ETH_INR DOUBLE, min_ETH_INR DOUBLE, avg_ETH_INR DOUBLE, max_ETH_USD DOUBLE, min_ETH_USD DOUBLE, avg_ETH_USD DOUBLE ) USING iceberg """) print(spark.sql("SHOW TABLES IN rest.db").show()) print(windowed_df.printSchema()) # Write stream data to Iceberg query = windowed_df.writeStream \ .outputMode("complete") \ .format("iceberg") \ .option("path", "rest.db.crypto_metrics") \ .option("checkpointLocation", "/tmp/spark/checkpoints/crypto_metrics") \ .trigger(processingTime="1 minute") \ .start() query.awaitTermination() ``` And I am submitting the spark job along with configurations as follows ``` spark-submit \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4 \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.rest.type=rest \ --conf spark.sql.catalog.rest.uri=http://rest:8181 \ --conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.rest.warehouse=$PWD/warehouse \ --conf spark.sql.defaultCatalog=rest \ test.py ``` **Error** `pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 69f9d70d-8718-4417-820d-6a2dd65036ae, runId = 3abf4234-26ea-4244-bc01-32eef2207d17] terminated with exception: Field max_BTC_INR not found in source schema` What am I possibly doing wrong? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org