stevenzwu commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1042792168


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -104,11 +105,38 @@ public static Schema convert(Schema baseSchema, 
TableSchema flinkSchema) {
     Types.StructType struct = convert(flinkSchema).asStruct();
     // reassign ids to match the base schema
     Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), 
baseSchema);
+    // reassign doc to match the base schema
+    schema = reassignDoc(schema, baseSchema);
+
     // fix types that can't be represented in Flink (UUID)
     Schema fixedSchema = FlinkFixupTypes.fixup(schema, baseSchema);
     return freshIdentifierFieldIds(fixedSchema, flinkSchema);
   }
 
+  private static Schema reassignDoc(Schema schema, Schema docSourceSchema) {
+    TypeUtil.CustomOrderSchemaVisitor<Type> visitor = new 
FlinkFixupDoc(docSourceSchema);
+    return new Schema(
+        visitor
+            .schema(schema, new VisitFuture<>(schema.asStruct(), visitor))
+            .asStructType()
+            .fields());
+  }
+
+  private static class VisitFuture<T> implements Supplier<T> {

Review Comment:
   not sure if we need to copy this class here. can it just be a lambda 
function for supplier?
   ```
   () -> TypeUtil.visit(type, visitor)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to