meatheadmike commented on issue #11341:
URL: https://github.com/apache/iceberg/issues/11341#issuecomment-2460554308

   This bug does not appear to be limited to AWS nor Flink. I'm getting the 
same error with the following:
   
   ```
   spark.sql(f"""
       CREATE EXTERNAL TABLE IF NOT EXISTS 
{iceberg_catalog}.{iceberg_db}.{iceberg_table}
       (
           offset BIGINT NOT NULL,
           kafka_timestamp TIMESTAMP NOT NULL,
           partition_key INT NOT NULL,
           domain STRING NOT NULL,
           risk STRING,
           timestamp TIMESTAMP NOT NULL
       )
       USING iceberg
       LOCATION '{iceberg_table_path}'
       TBLPROPERTIES (
               'format-version'='2',
               'write.delete.mode'='merge-on-read',
               'write.merge.mode'='merge-on-read',
               'write.metadata.delete-after-commit.enabled'='true',
               'write.update.mode'='merge-on-read',
               'write.upsert.enabled'='true'
       )
   """).collect()
   
   spark.sql(f"ALTER TABLE {iceberg_catalog}.{iceberg_db}.{iceberg_table} SET 
IDENTIFIER FIELDS domain").collect()
   
   def processMicroBatch(batch_df, batch_id):
       batch_df.printSchema()
       batch_df.show()
       batch_df.createOrReplaceTempView("kafka_source")
       batch_df.sparkSession.sql(f"""
           MERGE INTO `{iceberg_catalog}`.`{iceberg_db}`.`{iceberg_table}` t
           USING (SELECT * FROM `kafka_source`) s
           ON t.domain = s.domain
           WHEN MATCHED THEN UPDATE SET *
           WHEN NOT MATCHED THEN INSERT *
       """)
   
   df.writeStream \
      .format("iceberg") \
      .trigger(processingTime="1 minutes") \
      .options(**iceberg_options) \
      .outputMode("append") \
      .foreachBatch(processMicroBatch) \
      .start() 
   
   spark.streams.awaitAnyTermination()
   ```
   The returned error is: 
```pyspark.errors.exceptions.captured.IllegalArgumentException: Cannot add 
fieldId 4 as an identifier field: field does not exist ```
   Note that the identifier field is the 4th field in the schema.
   
   So it would appear that it is currently not possible to do streaming upserts 
with spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to