pvary commented on code in PR #9719: URL: https://github.com/apache/iceberg/pull/9719#discussion_r1501356608
########## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java: ########## @@ -193,6 +195,122 @@ public ParquetValueReader<?> map( ParquetValueReaders.option(valueType, valueD, valueReader)); } + private static class LogicalTypeAnnotationParquetValueReaderVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> { + + private final PrimitiveType primitive; + private final ColumnDescriptor desc; + private final org.apache.iceberg.types.Type.PrimitiveType expected; + + LogicalTypeAnnotationParquetValueReaderVisitor( + PrimitiveType primitive, + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected) { + this.primitive = primitive; + this.desc = desc; + this.expected = expected; + } + + @Override + public Optional<ParquetValueReader<?>> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { + return Optional.of(new StringReader(desc)); + } + + @Override + public Optional<ParquetValueReader<?>> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { + return Optional.of(new StringReader(desc)); + } + + @Override + public Optional<ParquetValueReader<?>> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(new StringReader(desc)); + } + + @Override + public Optional<ParquetValueReader<?>> visit( + DecimalLogicalTypeAnnotation decimalLogicalType) { + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + new BinaryDecimalReader( + desc, decimalLogicalType.getPrecision(), decimalLogicalType.getScale())); + case INT64: + return Optional.of( + new LongDecimalReader( + desc, decimalLogicalType.getPrecision(), decimalLogicalType.getScale())); + case INT32: + return Optional.of( + new IntegerDecimalReader( + desc, decimalLogicalType.getPrecision(), decimalLogicalType.getScale())); + } + + return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(decimalLogicalType); + } + + @Override + public Optional<ParquetValueReader<?>> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional<ParquetValueReader<?>> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { + if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return Optional.of(new MillisTimeReader(desc)); + } else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { + return Optional.of(new LossyMicrosToMillisTimeReader(desc)); + } + + return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timeLogicalType); + } + + @Override + public Optional<ParquetValueReader<?>> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { + if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + if (timestampLogicalType.isAdjustedToUTC()) { + return Optional.of(new MillisToTimestampTzReader(desc)); + } else { + return Optional.of(new MillisToTimestampReader(desc)); + } + } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { + if (timestampLogicalType.isAdjustedToUTC()) { + return Optional.of(new MicrosToTimestampTzReader(desc)); + } else { + return Optional.of(new MicrosToTimestampReader(desc)); + } + } + return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); Review Comment: Nit: newline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org