fengjiajie opened a new pull request, #10695: URL: https://github.com/apache/iceberg/pull/10695
In my use case with Iceberg 1.3, I have a Flink `function-1` that outputs a `DataStream<DataFile>`, which is then processed by the next function. The simplified code for `function-1` is as follows: ```java // Inside function-1: Map<Integer, Long> columnSizes = new HashMap<>(); columnSizes.put(1, 234L); DataFile dataFile = DataFiles.builder(icebergTable.spec()) .withMetrics(new Metrics(123L, columnSizes, ...)) ... .build(); // Move file to new path, then rebuild DataFile DataFile newDataFile = DataFiles.builder(icebergTable.spec()) .copy(dataFile) .withPath("file:///new_path") .build(); ``` If I return `dataFile`, Flink's Kryo framework can deserialize it correctly in the next function. However, if I return `newDataFile` (reconstructed with `copy`), Kryo fails with the following exception: ``` Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176) ... at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) ... 4 more Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: columnSizes (org.apache.iceberg.GenericDataFile) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ... Caused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableMap.put(Collections.java:1459) ... ``` This issue arises in Iceberg 1.15 but not in 1.13. The root cause lies in the `toReadableMap` method of `org.apache.iceberg.BaseFile`: ```java // Iceberg 1.13: private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) { return map instanceof SerializableMap ? ((SerializableMap)map).immutableMap() : map; } // Iceberg 1.15: private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) { if (map == null) { return null; } else if (map instanceof SerializableMap) { return ((SerializableMap<K, V>) map).immutableMap(); } else { return Collections.unmodifiableMap(map); } } ``` In Iceberg 1.15, `toReadableMap` wraps the map with `Collections.unmodifiableMap`, resulting in an `UnsupportedOperationException` during deserialization. While using `unmodifiableMap` seems correct, the `copy` operation might need to reconstruct these maps as regular mutable maps to avoid this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org