mxm commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1865523100


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##########
@@ -310,10 +366,16 @@ public void writeSnapshot(DataOutputView out) throws 
IOException {
     @Override
     public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
         throws IOException {
-      if (readVersion == 1) {
-        readV1(in);
-      } else {
-        throw new IllegalArgumentException("Unknown read version: " + 
readVersion);
+      switch (readVersion) {
+        case 1:
+          readV1(in);
+          this.version = 1;
+          break;
+        case 2:
+          readV1(in);

Review Comment:
   Can we remove the version suffix in the method name? It handles both 
versions.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java:
##########
@@ -93,15 +126,35 @@ public void serialize(CompletedStatistics record, 
DataOutputView target) throws
 
   @Override
   public CompletedStatistics deserialize(DataInputView source) throws 
IOException {
+    long checkpointId = source.readLong();
+    changeSortKeySerializerVersion(source.readInt());

Review Comment:
   AFAIK We were not writing the serializer version before this change. So not 
sure this works when restoring the serializer to read old data.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java:
##########
@@ -73,12 +73,17 @@ static byte[] serializeCompletedStatistics(
   }
 
   static CompletedStatistics deserializeCompletedStatistics(
-      byte[] bytes, TypeSerializer<CompletedStatistics> statisticsSerializer) {
+      byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) {
     try {
       DataInputDeserializer input = new DataInputDeserializer(bytes);
       return statisticsSerializer.deserialize(input);
-    } catch (IOException e) {
-      throw new UncheckedIOException("Fail to deserialize aggregated 
statistics", e);
+    } catch (Exception e) {
+      try {
+        DataInputDeserializer input = new DataInputDeserializer(bytes);
+        return statisticsSerializer.deserializeV1(input);
+      } catch (IOException ioException) {
+        throw new UncheckedIOException("Fail to deserialize aggregated 
statistics", ioException);
+      }

Review Comment:
   I see, we're retrying here in case the restore fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to