baotran306 commented on issue #12738:
URL: https://github.com/apache/iceberg/issues/12738#issuecomment-2818189800

   I also tested `CREATE OR REPLACE TABLE` to replace my table to new schema 
and partition. 
   
   In this case, I have table `local.risk_mon_out.test_change_1` partition by 
`category` column, I tried to replace with new schema and partition by 
`vendor_id` column.
   
![Image](https://github.com/user-attachments/assets/712f3b17-9def-4fca-ac8a-c576ac80f647)
   
   Iceberg document said that "The schema and partition spec will be replaced 
if changed." So as my expectation, new schema and partition will be replaced 
entirely. But, it doesn't works, Iceberg try to check source column for 
partition field `category` when we don't need this column any more in new 
schema and partition.
   
   My code:
   ```python
   import pyspark
   from pyspark.conf import SparkConf
   from pyspark.sql import SparkSession
   
   SPARK_VERSION = pyspark.__version__
   SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])
   ICEBERG_VERSION = "1.7.2"
   LOCAL_PATH = "my_downloaded_jar_path"
   
   conf_local = {
       "spark.jars": 
f"{LOCAL_PATH}/jars/iceberg-spark-runtime-3.5_2.12-1.7.2.jar,{LOCAL_PATH}/jars/iceberg-aws-bundle-1.7.2.jar",
       "spark.sql.extensions": 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
       "spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog",
       "spark.sql.catalog.local.type": "hadoop",
       "spark.sql.catalog.local.warehouse": "warehouse2"
   }
   spark_config = SparkConf().setMaster('local').setAppName("Iceberg-REST")
   for k, v in conf_local.items():
       spark_config = spark_config.set(k, v)
   
   spark = SparkSession.builder.config(conf=spark_config).getOrCreate()
   
   # Create namespace, partition table, insert data
   spark.sql("CREATE NAMESPACE IF NOT EXISTS risk_mon_out")
   spark.sql("""
       CREATE TABLE local.risk_mon_out.test_change_1(id int, category string, 
event_time timestamp)
       USING iceberg
       PARTITIONED BY (category)
   """)
   spark.sql("""INSERT INTO local.risk_mon_out.test_change_1 VALUES (5, 
"HOVal", TIMESTAMP '2025-03-01 11:03:09'), (6, "NextMi", TIMESTAMP '2025-03-01 
12:19:29'), (7, "OGel", TIMESTAMP '2025-01-09 06:43:17'), (8, "AAAA", TIMESTAMP 
'2025-01-09 06:05:29')""")
   spark.sql("SELECT * FROM local.risk_mon_out.test_change_1").show()
   
   # Create table data source test
   spark.sql("""CREATE TABLE local.risk_mon_out.data_test_1(vendor_id int, text 
string ) USING iceberg""")
   spark.sql("""INSERT INTO local.risk_mon_out.data_test_1 VALUES (1, 
'ABC'),(2, 'PSZ')""")
   spark.sql("SELECT * FROM local.risk_mon_out.data_test_1").show()
   
   
   # Try to Replace table with new schema and partition but doesn't work
   spark.sql("""
           CREATE OR REPLACE TABLE local.risk_mon_out.test_change_1
           USING iceberg
           PARTITIONED BY (vendor_id)
           AS SELECT vendor_id, text from local.risk_mon_out.data_test_1
   """)
   ``` 
   
   Error message: 
   ```
   org.apache.iceberg.exceptions.ValidationException: Cannot find source column 
for partition field: 1000: category: identity(2)
        at 
org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
        at 
org.apache.iceberg.PartitionSpec.checkCompatibility(PartitionSpec.java:636)
        at 
org.apache.iceberg.PartitionSpec$Builder.build(PartitionSpec.java:617)
        at 
org.apache.iceberg.UnboundPartitionSpec.bind(UnboundPartitionSpec.java:46)
        at 
org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:71)
        at 
org.apache.iceberg.PartitionSpecParser.lambda$fromJson$1(PartitionSpecParser.java:88)
        at org.apache.iceberg.util.JsonUtil.parse(JsonUtil.java:99)
        at 
org.apache.iceberg.PartitionSpecParser.lambda$fromJson$2(PartitionSpecParser.java:88)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
        at 
java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
        at 
org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:86)
        at 
org.apache.iceberg.SerializableTable.lambda$specs$1(SerializableTable.java:196)
        at java.base/java.util.HashMap.forEach(HashMap.java:1429)
        at 
org.apache.iceberg.SerializableTable.specs(SerializableTable.java:194)
        at 
org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:674)
        at 
org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:668)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:441)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to