fengjiajie opened a new pull request, #10695:
URL: https://github.com/apache/iceberg/pull/10695

   In my use case with Iceberg 1.3, I have a Flink `function-1` that outputs a 
`DataStream<DataFile>`, which is then processed by the next function. The 
simplified code for `function-1` is as follows:
   
   ```java
   // Inside function-1:
   
           Map<Integer, Long> columnSizes  = new HashMap<>();
           columnSizes.put(1, 234L);
           DataFile dataFile = DataFiles.builder(icebergTable.spec())
               .withMetrics(new Metrics(123L, columnSizes, ...))
               ...
               .build();
   
           // Move file to new path, then rebuild DataFile
           DataFile newDataFile = DataFiles.builder(icebergTable.spec())
               .copy(dataFile)
               .withPath("file:///new_path")
               .build();
   ```
   
   If I return `dataFile`, Flink's Kryo framework can deserialize it correctly 
in the next function. However, if I return `newDataFile` (reconstructed with 
`copy`), Kryo fails with the following exception:
   
   ```
   Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
     ...
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
        ... 4 more
   Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
   Serialization trace:
   columnSizes (org.apache.iceberg.GenericDataFile)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        ...
   Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableMap.put(Collections.java:1459)
        ...
   ```
   
   This issue arises in Iceberg 1.15 but not in 1.13. The root cause lies in 
the `toReadableMap` method of `org.apache.iceberg.BaseFile`:
   
   ```java
   // Iceberg 1.13:
     private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) {
       return map instanceof SerializableMap ? 
((SerializableMap)map).immutableMap() : map;
     }
   
   // Iceberg 1.15:
     private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) {
       if (map == null) {
         return null;
       } else if (map instanceof SerializableMap) {
         return ((SerializableMap<K, V>) map).immutableMap();
       } else {
         return Collections.unmodifiableMap(map);
       }
     }
   ```
   
   In Iceberg 1.15, `toReadableMap` wraps the map with 
`Collections.unmodifiableMap`, resulting in an `UnsupportedOperationException` 
during deserialization. While using `unmodifiableMap` seems correct, the `copy` 
operation might need to reconstruct these maps as regular mutable maps to avoid 
this issue. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to