MarigWeizhi opened a new issue, #16418:
URL: https://github.com/apache/iceberg/issues/16418
### Apache Iceberg version
1.10.1
### Query engine
Spark
### Please describe the bug š
I think RewriteTablePathSparkAction should be safely executed repeatedly to
support incremental data migration.
When I execute rewrite_table_stath for the second time on a MOR table with a
posdelete file, I get the following error
```
2026-05-19 15:56:23,417 [ERROR] [Executor task launch worker for task 0.0 in
stage 57.0 (TID 69)] Executor: Exception in task 0.0 in stage 57.0 (TID 69)
java.io.UncheckedIOException: Failed to create Parquet file
at
org.apache.iceberg.parquet.ParquetWriter.ensureWriterInitialized(ParquetWriter.java:124)
at
org.apache.iceberg.parquet.ParquetWriter.flushRowGroup(ParquetWriter.java:214)
at
org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:258)
at
org.apache.iceberg.deletes.PositionDeleteWriter.close(PositionDeleteWriter.java:92)
at
org.apache.iceberg.RewriteTablePathUtil.rewritePositionDeleteFile(RewriteTablePathUtil.java:633)
at
org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.lambda$rewritePositionDelete$a4760a1f$1(RewriteTablePathSparkAction.java:673)
at
org.apache.spark.sql.Dataset.$anonfun$foreach$2(Dataset.scala:3507)
at
org.apache.spark.sql.Dataset.$anonfun$foreach$2$adapted(Dataset.scala:3507)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1031)
at
org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1031)
at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException:
/usr/hive/warehouse/staging/orders/data/region=cn/dt=2024-01-01/00059-1-8c49b5a8-f44f-415c-9836-fb8d832aaf5c-00001-deletes.parquet
for client 10.xx.x.xx already exists
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:389)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2583)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2479)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:841)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:326)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:529)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1039)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:963)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2065)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3047)
```
Every other artifact written to the staging directory uses overwrite
semantics, except for PositionDeleteFile.
|Artifact|Write call|Mode|
|-|-|-|
|vN-*.metadata.json|TableMetadataParser.overwrite(...)|overwrite|
|snap-*.avro |ManifestLists.write (Avro default)|overwrite|
|*.avro|ManifestFiles.write (Avro default)|overwrite|
|position delete
*.parquet/avro/orc|Parquet.writeDeletes(outputFile)ā¦buildPositionWriter()|CREATE|
How to reproduce
```
CREATE TABLE spark_catalog.sales.orders (
id BIGINT,
user_id BIGINT,
amount DOUBLE,
status STRING,
region STRING,
dt STRING,
updated_at TIMESTAMP
) USING iceberg
PARTITIONED BY (region, dt)
TBLPROPERTIES (
'format-version' = '2',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read'
);
INSERT INTO spark_catalog.sales.orders VALUES
(1, 1001, 100.5, 'paid', 'cn', '2024-01-01', timestamp'2024-01-01
10:00:00'),
(2, 1002, 200.0, 'paid', 'cn', '2024-01-01', timestamp'2024-01-01
11:00:00'),
(3, 1003, 300.0, 'pending', 'cn', '2024-01-02', timestamp'2024-01-02
09:00:00'),
(4, 1004, 400.0, 'paid', 'us', '2024-01-01', timestamp'2024-01-01
12:00:00'),
(5, 1005, 500.0, 'cancel', 'us', '2024-01-02', timestamp'2024-01-02
13:00:00'),
(6, 1006, 600.0, 'paid', 'eu', '2024-01-01', timestamp'2024-01-01
14:00:00');
INSERT INTO spark_catalog.sales.orders VALUES
(7, 1007, 700.0, 'paid', 'cn', '2024-01-03', timestamp'2024-01-03
10:00:00'),
(8, 1008, 800.0, 'paid', 'us', '2024-01-03', timestamp'2024-01-03
11:00:00'),
(9, 1009, 900.0, 'pending','jp', '2024-01-03', timestamp'2024-01-03
12:00:00'),
(10, 1010, 150.0, 'paid', 'cn', '2024-01-02', timestamp'2024-01-02
15:00:00');
DELETE FROM spark_catalog.sales.orders WHERE id = 5;
DELETE FROM spark_catalog.sales.orders WHERE status = 'pending';
UPDATE spark_catalog.sales.orders SET amount = amount * 1.1 WHERE region =
'cn';
```
```sql
-- Execute twice
CALL spark_catalog.system.rewrite_table_path(
table => 'sales.orders',
source_prefix =>
'hdfs://xxx.xx.xx.xx:xx/usr/hive/warehouse/sales.db/orders',
target_prefix => 'hdfs://xxxx/iceberg-warehouse/sales/orders',
staging_location =>
'hdfs://xxx.xx.xx.xx:xx/usr/hive/warehouse/staging/orders'
);
```
### Willingness to contribute
- [x] I can contribute a fix for this bug independently
- [ ] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]