mxm commented on code in PR #14245:
URL: https://github.com/apache/iceberg/pull/14245#discussion_r2622323561


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java:
##########
@@ -41,30 +48,116 @@
 public class RowDataToAvroGenericRecordConverter implements Function<RowData, 
GenericRecord> {
   private final RowDataToAvroConverters.RowDataToAvroConverter converter;
   private final Schema avroSchema;
+  private final Set<Integer> timestampNanosFieldIndices;
+  private final RowData.FieldGetter[] fieldGetters;
 
-  private RowDataToAvroGenericRecordConverter(RowType rowType, Schema 
avroSchema) {
-    this.converter = RowDataToAvroConverters.createConverter(rowType);
+  private RowDataToAvroGenericRecordConverter(
+      RowType converterRowType, Schema avroSchema, Set<Integer> 
timestampNanosFieldIndices) {
+    this.converter = RowDataToAvroConverters.createConverter(converterRowType);
     this.avroSchema = avroSchema;
+    this.timestampNanosFieldIndices = timestampNanosFieldIndices;
+
+    // Pre-create field getters only if there are timestamp-nanos fields
+    // (otherwise we take the early return path and never use them)
+    if (!timestampNanosFieldIndices.isEmpty()) {
+      this.fieldGetters = new 
RowData.FieldGetter[converterRowType.getFieldCount()];
+      for (int i = 0; i < converterRowType.getFieldCount(); i++) {
+        LogicalType fieldType = converterRowType.getTypeAt(i);
+        this.fieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+      }
+    } else {
+      this.fieldGetters = null;
+    }
   }
 
   @Override
   public GenericRecord apply(RowData rowData) {
-    return (GenericRecord) converter.convert(avroSchema, rowData);
+    // Pre-process: Flink's RowDataToAvroConverters expects Long (nanoseconds) 
for timestamp-nanos,
+    // but our RowData has TimestampData. Convert TimestampData to Long 
nanoseconds.
+    if (timestampNanosFieldIndices.isEmpty()) {
+      return (GenericRecord) converter.convert(avroSchema, rowData);
+    }
+
+    // Create a new GenericRowData with Long values for timestamp-nanos fields
+    GenericRowData processedRowData = new GenericRowData(rowData.getArity());
+    processedRowData.setRowKind(rowData.getRowKind());
+
+    for (int i = 0; i < rowData.getArity(); i++) {
+      if (timestampNanosFieldIndices.contains(i)) {
+        // Convert TimestampData to Long nanoseconds
+        if (!rowData.isNullAt(i)) {
+          TimestampData timestampData = rowData.getTimestamp(i, 9);
+          long nanos =
+              timestampData.getMillisecond() * 1_000_000L + 
timestampData.getNanoOfMillisecond();
+          processedRowData.setField(i, nanos);
+        } else {
+          processedRowData.setField(i, null);
+        }
+      } else {
+        // Copy other fields as-is using pre-created field getter
+        processedRowData.setField(i, fieldGetters[i].getFieldOrNull(rowData));
+      }
+    }
+
+    return (GenericRecord) converter.convert(avroSchema, processedRowData);
   }
 
   /** Create a converter based on Iceberg schema */
   public static RowDataToAvroGenericRecordConverter fromIcebergSchema(
       String tableName, org.apache.iceberg.Schema icebergSchema) {
-    RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+    RowType originalRowType = FlinkSchemaUtil.convert(icebergSchema);
     Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, tableName);
-    return new RowDataToAvroGenericRecordConverter(rowType, avroSchema);
+
+    // Detect timestamp-nanos fields and create converter RowType with BIGINT 
for those fields
+    List<LogicalType> converterFields = Lists.newArrayList();
+    Set<Integer> timestampNanosFieldIndices = new java.util.HashSet<>();

Review Comment:
   NIT: 
   ```suggestion
       Set<Integer> timestampNanosFieldIndices = Sets.newHashSet();
   ```



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java:
##########
@@ -137,12 +137,30 @@ public Type visit(TimeType timeType) {
 
   @Override
   public Type visit(TimestampType timestampType) {
-    return Types.TimestampType.withoutZone();
+    int precision = timestampType.getPrecision();
+    if (precision > 9) {
+      throw new IllegalArgumentException(
+          "Unsupported timestamp precision: " + precision + ". Maximum 
supported precision is 9.");
+    }
+    if (precision == 9) {
+      return Types.TimestampNanoType.withoutZone();
+    } else {

Review Comment:
   Yes, they are possible. Not if this Flink type has been constructed from an 
Iceberg table, but there are other ways in Flink to create a timestamp with 
precision < 9.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to