pvary commented on code in PR #14245:
URL: https://github.com/apache/iceberg/pull/14245#discussion_r2611410642
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java:
##########
@@ -41,30 +48,116 @@
public class RowDataToAvroGenericRecordConverter implements Function<RowData,
GenericRecord> {
private final RowDataToAvroConverters.RowDataToAvroConverter converter;
private final Schema avroSchema;
+ private final Set<Integer> timestampNanosFieldIndices;
+ private final RowData.FieldGetter[] fieldGetters;
- private RowDataToAvroGenericRecordConverter(RowType rowType, Schema
avroSchema) {
- this.converter = RowDataToAvroConverters.createConverter(rowType);
+ private RowDataToAvroGenericRecordConverter(
+ RowType converterRowType, Schema avroSchema, Set<Integer>
timestampNanosFieldIndices) {
+ this.converter = RowDataToAvroConverters.createConverter(converterRowType);
this.avroSchema = avroSchema;
+ this.timestampNanosFieldIndices = timestampNanosFieldIndices;
+
+ // Pre-create field getters only if there are timestamp-nanos fields
+ // (otherwise we take the early return path and never use them)
+ if (!timestampNanosFieldIndices.isEmpty()) {
+ this.fieldGetters = new
RowData.FieldGetter[converterRowType.getFieldCount()];
+ for (int i = 0; i < converterRowType.getFieldCount(); i++) {
+ LogicalType fieldType = converterRowType.getTypeAt(i);
+ this.fieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+ }
+ } else {
+ this.fieldGetters = null;
+ }
}
@Override
public GenericRecord apply(RowData rowData) {
- return (GenericRecord) converter.convert(avroSchema, rowData);
+ // Pre-process: Flink's RowDataToAvroConverters expects Long (nanoseconds)
for timestamp-nanos,
+ // but our RowData has TimestampData. Convert TimestampData to Long
nanoseconds.
+ if (timestampNanosFieldIndices.isEmpty()) {
+ return (GenericRecord) converter.convert(avroSchema, rowData);
+ }
+
+ // Create a new GenericRowData with Long values for timestamp-nanos fields
+ GenericRowData processedRowData = new GenericRowData(rowData.getArity());
+ processedRowData.setRowKind(rowData.getRowKind());
+
+ for (int i = 0; i < rowData.getArity(); i++) {
+ if (timestampNanosFieldIndices.contains(i)) {
+ // Convert TimestampData to Long nanoseconds
+ if (!rowData.isNullAt(i)) {
+ TimestampData timestampData = rowData.getTimestamp(i, 9);
+ long nanos =
+ timestampData.getMillisecond() * 1_000_000L +
timestampData.getNanoOfMillisecond();
+ processedRowData.setField(i, nanos);
+ } else {
+ processedRowData.setField(i, null);
+ }
+ } else {
+ // Copy other fields as-is using pre-created field getter
+ processedRowData.setField(i, fieldGetters[i].getFieldOrNull(rowData));
+ }
+ }
+
+ return (GenericRecord) converter.convert(avroSchema, processedRowData);
}
/** Create a converter based on Iceberg schema */
public static RowDataToAvroGenericRecordConverter fromIcebergSchema(
String tableName, org.apache.iceberg.Schema icebergSchema) {
- RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+ RowType originalRowType = FlinkSchemaUtil.convert(icebergSchema);
Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, tableName);
- return new RowDataToAvroGenericRecordConverter(rowType, avroSchema);
+
+ // Detect timestamp-nanos fields and create converter RowType with BIGINT
for those fields
+ List<LogicalType> converterFields = Lists.newArrayList();
+ Set<Integer> timestampNanosFieldIndices = new java.util.HashSet<>();
+ List<Schema.Field> avroFields = avroSchema.getFields();
+
+ for (int i = 0; i < avroFields.size(); i++) {
+ if (avroFields.get(i).schema().getLogicalType() instanceof
LogicalTypes.TimestampNanos) {
+ // Use BIGINT for Flink's converter (it expects Long for
timestamp-nanos)
+ converterFields.add(new BigIntType());
+ timestampNanosFieldIndices.add(i);
+ } else {
+ converterFields.add(originalRowType.getTypeAt(i));
+ }
+ }
+
+ RowType converterRowType = RowType.of(converterFields.toArray(new
LogicalType[0]));
+ return new RowDataToAvroGenericRecordConverter(
+ converterRowType, avroSchema, timestampNanosFieldIndices);
}
/** Create a mapper based on Avro schema */
public static RowDataToAvroGenericRecordConverter fromAvroSchema(Schema
avroSchema) {
DataType dataType =
AvroSchemaConverter.convertToDataType(avroSchema.toString());
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
- RowType rowType = RowType.of(logicalType.getChildren().toArray(new
LogicalType[0]));
- return new RowDataToAvroGenericRecordConverter(rowType, avroSchema);
+
+ // Fix up timestamp-nanos fields: Flink's RowDataToAvroConverters expects
Long (nanoseconds)
+ // for timestamp-nanos logical type, but our RowData has TimESTAMP(9) with
TimestampData.
Review Comment:
nit: `TimESTAMP`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]