rohitanil opened a new issue, #11843:
URL: https://github.com/apache/iceberg/issues/11843

   ### Query engine
   
   Spark
   
   ### Question
   
   I am using a REST catalog and spark iceberg on docker to read from Kafka 
topic, make some transformations and write it out as iceberg tables. I am able 
to print the transformed data to the console but throws an error when I write 
to Iceberg
   
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import col, from_json, min, max, avg, window
   from pyspark.sql.types import StructField, StructType, DoubleType, 
StringType, TimestampType
   
   if __name__ == "__main__":
       spark = 
SparkSession.builder.appName("Kafka-Iceberg-Stream-Processor").getOrCreate()
       spark.sparkContext.setLogLevel("ERROR")
   
       schema = StructType([
           StructField("BTC_EUR", DoubleType(), True),
           StructField("BTC_INR", DoubleType(), True),
           StructField("BTC_USD", DoubleType(), True),
           StructField("ETH_EUR", DoubleType(), True),
           StructField("ETH_INR", DoubleType(), True),
           StructField("ETH_USD", DoubleType(), True),
           StructField("timestamp", TimestampType(), True)
       ])
   
       kafka_df = spark.readStream \
           .format("kafka") \
           .option("kafka.bootstrap.servers", "192.168.1.16:9092") \
           .option("subscribe", "crypto") \
           .option("startingOffsets", "earliest") \
           .load()
   
       parsed_df = kafka_df.selectExpr(
           "CAST(value AS STRING) AS value",
           "timestamp"
       ).withColumn("parsed_json", from_json(col("value"), schema)) \
           .select(
           col("parsed_json.BTC_EUR").alias("BTC_EUR"),
           col("parsed_json.BTC_INR").alias("BTC_INR"),
           col("parsed_json.BTC_USD").alias("BTC_USD"),
           col("parsed_json.ETH_EUR").alias("ETH_EUR"),
           col("parsed_json.ETH_INR").alias("ETH_INR"),
           col("parsed_json.ETH_USD").alias("ETH_USD"),
           col("parsed_json.timestamp").alias("event_time"),
           col("timestamp").alias("processing_time")
       )
   
       windowed_df = parsed_df \
           .withWatermark("event_time", "1 minute") \
           .groupBy(window(col("event_time"), "5 minutes", "5 minutes")) \
           .agg(
               max("BTC_EUR").alias("max_BTC_EUR"),
               min("BTC_EUR").alias("min_BTC_EUR"),
               avg("BTC_EUR").alias("avg_BTC_EUR"),
               max("BTC_INR").alias("max_BTC_INR"),
               min("BTC_INR").alias("min_BTC_INR"),
               avg("BTC_INR").alias("avg_BTC_INR"),
               max("BTC_USD").alias("max_BTC_USD"),
               min("BTC_USD").alias("min_BTC_USD"),
               avg("BTC_USD").alias("avg_BTC_USD"),
               max("ETH_EUR").alias("max_ETH_EUR"),
               min("ETH_EUR").alias("min_ETH_EUR"),
               avg("ETH_EUR").alias("avg_ETH_EUR"),
               max("ETH_INR").alias("max_ETH_INR"),
               min("ETH_INR").alias("min_ETH_INR"),
               avg("ETH_INR").alias("avg_ETH_INR"),
               max("ETH_USD").alias("max_ETH_USD"),
               min("ETH_USD").alias("min_ETH_USD"),
               avg("ETH_USD").alias("avg_ETH_USD")
           ).withColumn("window_start", col("window.start")) \
           .withColumn("window_end", col("window.end")) \
           .drop("window")  # Dropping the original window column
   
       # Create the table if it doesn't exist
       spark.sql("""
           CREATE TABLE IF NOT EXISTS rest.db.crypto_metrics (
               window_start TIMESTAMP,
               window_end TIMESTAMP,
               max_BTC_EUR DOUBLE,
               min_BTC_EUR DOUBLE,
               avg_BTC_EUR DOUBLE,
               max_BTC_INR DOUBLE,
               min_BTC_INR DOUBLE,
               avg_BTC_INR DOUBLE,
               max_BTC_USD DOUBLE,
               min_BTC_USD DOUBLE,
               avg_BTC_USD DOUBLE,
               max_ETH_EUR DOUBLE,
               min_ETH_EUR DOUBLE,
               avg_ETH_EUR DOUBLE,
               max_ETH_INR DOUBLE,
               min_ETH_INR DOUBLE,
               avg_ETH_INR DOUBLE,
               max_ETH_USD DOUBLE,
               min_ETH_USD DOUBLE,
               avg_ETH_USD DOUBLE
           ) USING iceberg
       """)
   
       print(spark.sql("SHOW TABLES IN rest.db").show())
       print(windowed_df.printSchema())
       # Write stream data to Iceberg
       query = windowed_df.writeStream \
           .outputMode("complete") \
           .format("iceberg") \
           .option("path", "rest.db.crypto_metrics") \
           .option("checkpointLocation", 
"/tmp/spark/checkpoints/crypto_metrics") \
           .trigger(processingTime="1 minute") \
           .start()
   
       query.awaitTermination()
   ```
   And I am submitting the spark job along with configurations as follows
   ```
   spark-submit \
       --packages 
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4
 \
       --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
       --conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.rest.type=rest \
       --conf spark.sql.catalog.rest.uri=http://rest:8181 \
       --conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.rest.warehouse=$PWD/warehouse \
       --conf spark.sql.defaultCatalog=rest \
       test.py
   ```
   **Error** 
   `pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] 
Query [id = 69f9d70d-8718-4417-820d-6a2dd65036ae, runId = 
3abf4234-26ea-4244-bc01-32eef2207d17] terminated with exception: Field 
max_BTC_INR not found in source schema`
   
   What am I possibly doing wrong?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to