pvary commented on code in PR #14245:
URL: https://github.com/apache/iceberg/pull/14245#discussion_r2611410642


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java:
##########
@@ -41,30 +48,116 @@
 public class RowDataToAvroGenericRecordConverter implements Function<RowData, 
GenericRecord> {
   private final RowDataToAvroConverters.RowDataToAvroConverter converter;
   private final Schema avroSchema;
+  private final Set<Integer> timestampNanosFieldIndices;
+  private final RowData.FieldGetter[] fieldGetters;
 
-  private RowDataToAvroGenericRecordConverter(RowType rowType, Schema 
avroSchema) {
-    this.converter = RowDataToAvroConverters.createConverter(rowType);
+  private RowDataToAvroGenericRecordConverter(
+      RowType converterRowType, Schema avroSchema, Set<Integer> 
timestampNanosFieldIndices) {
+    this.converter = RowDataToAvroConverters.createConverter(converterRowType);
     this.avroSchema = avroSchema;
+    this.timestampNanosFieldIndices = timestampNanosFieldIndices;
+
+    // Pre-create field getters only if there are timestamp-nanos fields
+    // (otherwise we take the early return path and never use them)
+    if (!timestampNanosFieldIndices.isEmpty()) {
+      this.fieldGetters = new 
RowData.FieldGetter[converterRowType.getFieldCount()];
+      for (int i = 0; i < converterRowType.getFieldCount(); i++) {
+        LogicalType fieldType = converterRowType.getTypeAt(i);
+        this.fieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+      }
+    } else {
+      this.fieldGetters = null;
+    }
   }
 
   @Override
   public GenericRecord apply(RowData rowData) {
-    return (GenericRecord) converter.convert(avroSchema, rowData);
+    // Pre-process: Flink's RowDataToAvroConverters expects Long (nanoseconds) 
for timestamp-nanos,
+    // but our RowData has TimestampData. Convert TimestampData to Long 
nanoseconds.
+    if (timestampNanosFieldIndices.isEmpty()) {
+      return (GenericRecord) converter.convert(avroSchema, rowData);
+    }
+
+    // Create a new GenericRowData with Long values for timestamp-nanos fields
+    GenericRowData processedRowData = new GenericRowData(rowData.getArity());
+    processedRowData.setRowKind(rowData.getRowKind());
+
+    for (int i = 0; i < rowData.getArity(); i++) {
+      if (timestampNanosFieldIndices.contains(i)) {
+        // Convert TimestampData to Long nanoseconds
+        if (!rowData.isNullAt(i)) {
+          TimestampData timestampData = rowData.getTimestamp(i, 9);
+          long nanos =
+              timestampData.getMillisecond() * 1_000_000L + 
timestampData.getNanoOfMillisecond();
+          processedRowData.setField(i, nanos);
+        } else {
+          processedRowData.setField(i, null);
+        }
+      } else {
+        // Copy other fields as-is using pre-created field getter
+        processedRowData.setField(i, fieldGetters[i].getFieldOrNull(rowData));
+      }
+    }
+
+    return (GenericRecord) converter.convert(avroSchema, processedRowData);
   }
 
   /** Create a converter based on Iceberg schema */
   public static RowDataToAvroGenericRecordConverter fromIcebergSchema(
       String tableName, org.apache.iceberg.Schema icebergSchema) {
-    RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+    RowType originalRowType = FlinkSchemaUtil.convert(icebergSchema);
     Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, tableName);
-    return new RowDataToAvroGenericRecordConverter(rowType, avroSchema);
+
+    // Detect timestamp-nanos fields and create converter RowType with BIGINT 
for those fields
+    List<LogicalType> converterFields = Lists.newArrayList();
+    Set<Integer> timestampNanosFieldIndices = new java.util.HashSet<>();
+    List<Schema.Field> avroFields = avroSchema.getFields();
+
+    for (int i = 0; i < avroFields.size(); i++) {
+      if (avroFields.get(i).schema().getLogicalType() instanceof 
LogicalTypes.TimestampNanos) {
+        // Use BIGINT for Flink's converter (it expects Long for 
timestamp-nanos)
+        converterFields.add(new BigIntType());
+        timestampNanosFieldIndices.add(i);
+      } else {
+        converterFields.add(originalRowType.getTypeAt(i));
+      }
+    }
+
+    RowType converterRowType = RowType.of(converterFields.toArray(new 
LogicalType[0]));
+    return new RowDataToAvroGenericRecordConverter(
+        converterRowType, avroSchema, timestampNanosFieldIndices);
   }
 
   /** Create a mapper based on Avro schema */
   public static RowDataToAvroGenericRecordConverter fromAvroSchema(Schema 
avroSchema) {
     DataType dataType = 
AvroSchemaConverter.convertToDataType(avroSchema.toString());
     LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
-    RowType rowType = RowType.of(logicalType.getChildren().toArray(new 
LogicalType[0]));
-    return new RowDataToAvroGenericRecordConverter(rowType, avroSchema);
+
+    // Fix up timestamp-nanos fields: Flink's RowDataToAvroConverters expects 
Long (nanoseconds)
+    // for timestamp-nanos logical type, but our RowData has TimESTAMP(9) with 
TimestampData.

Review Comment:
   nit: `TimESTAMP`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to