This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new debbb6df6f [pinot-avro plugin] by default enable logical types support (#15654) debbb6df6f is described below commit debbb6df6f886b0f0bbcf405d70f82e4575b626d Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Tue May 6 14:40:20 2025 +0800 [pinot-avro plugin] by default enable logical types support (#15654) --- .../inputformat/avro/AvroRecordExtractor.java | 2 +- .../avro/AvroRecordExtractorConfig.java | 2 +- .../inputformat/avro/AvroRecordReaderConfig.java | 2 +- .../plugin/inputformat/avro/AvroSchemaUtil.java | 26 ++++++++++++++++++++-- .../avro/AvroRecordToPinotRowGeneratorTest.java | 13 ++++++----- 5 files changed, 34 insertions(+), 11 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java index 2ff8f59f0c..bd62df67c7 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java @@ -39,7 +39,7 @@ import org.apache.pinot.spi.data.readers.RecordExtractorConfig; public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> { private Set<String> _fields; private boolean _extractAll = false; - private boolean _applyLogicalTypes; + private boolean _applyLogicalTypes = true; @Override public void init(@Nullable Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) { diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorConfig.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorConfig.java index da26b1048a..d913cffca2 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorConfig.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorConfig.java @@ -26,7 +26,7 @@ import org.apache.pinot.spi.data.readers.RecordExtractorConfig; * Config for {@link AvroRecordExtractor} */ public class AvroRecordExtractorConfig implements RecordExtractorConfig { - private boolean _enableLogicalTypes = false; + private boolean _enableLogicalTypes = true; @Override public void init(Map<String, String> props) { diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderConfig.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderConfig.java index aa5b642fb6..bcb5ff1d19 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderConfig.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderConfig.java @@ -24,7 +24,7 @@ import org.apache.pinot.spi.data.readers.RecordReaderConfig; * Config for {@link AvroRecordReader} */ public class AvroRecordReaderConfig implements RecordReaderConfig { - private boolean _enableLogicalTypes; + private boolean _enableLogicalTypes = true; public boolean isEnableLogicalTypes() { return _enableLogicalTypes; diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroSchemaUtil.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroSchemaUtil.java index 3acd615866..fb8b777618 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroSchemaUtil.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroSchemaUtil.java @@ -21,6 +21,7 @@ package org.apache.pinot.plugin.inputformat.avro; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.avro.Conversion; import org.apache.avro.Conversions; @@ -183,7 +184,7 @@ public class AvroSchemaUtil { private static Object applySchemaTypeLogic(Schema schema, Object value) { switch (schema.getType()) { case ARRAY: - return processArraySchema((GenericData.Array) value, schema); + return processArraySchema(value, schema); case MAP: return processMapSchema((Map<String, Object>) value, schema); case RECORD: @@ -193,8 +194,26 @@ public class AvroSchemaUtil { } } - private static Object processArraySchema(GenericData.Array array, Schema schema) { + private static Object processArraySchema(Object object, Schema schema) { Schema elementSchema = schema.getElementType(); + if (object == null) { + return null; + } + if (object instanceof List) { + List<Object> list = (List<Object>) object; + list.replaceAll(element -> processElement(element, elementSchema)); + return list; + } + if (object.getClass().isArray()) { + int length = java.lang.reflect.Array.getLength(object); + for (int i = 0; i < length; i++) { + Object element = java.lang.reflect.Array.get(object, i); + java.lang.reflect.Array.set(object, i, processElement(element, elementSchema)); + } + return object; + } + + GenericData.Array array = (GenericData.Array) object; for (int i = 0; i < array.size(); i++) { array.set(i, processElement(array.get(i), elementSchema)); } @@ -203,6 +222,9 @@ public class AvroSchemaUtil { private static Object processMapSchema(Map<String, Object> map, Schema schema) { Schema valueSchema = schema.getValueType(); + if (map == null) { + return null; + } for (Map.Entry<String, Object> entry : map.entrySet()) { entry.setValue(processElement(entry.getValue(), valueSchema)); } diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java index a766508f53..6fc9b3c16c 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.plugin.inputformat.avro; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -66,8 +67,8 @@ public class AvroRecordToPinotRowGeneratorTest { GenericRow genericRow = new GenericRow(); // List - genericRecord.put("intMV", List.of(1, 2, 3)); - genericRecord.put("stringMV", List.of("value1", "value2", "value3")); + genericRecord.put("intMV", new ArrayList(List.of(1, 2, 3))); + genericRecord.put("stringMV", new ArrayList(List.of("value1", "value2", "value3"))); avroRecordExtractor.extract(genericRecord, genericRow); assertEqualsDeep(genericRow.getFieldToValueMap(), Map.of("intMV", new Object[]{1, 2, 3}, "stringMV", new Object[]{"value1", "value2", "value3"})); @@ -98,8 +99,8 @@ public class AvroRecordToPinotRowGeneratorTest { // Empty List genericRow.clear(); - genericRecord.put("intMV", List.of()); - genericRecord.put("stringMV", List.of()); + genericRecord.put("intMV", new ArrayList<>()); + genericRecord.put("stringMV", new ArrayList<>()); avroRecordExtractor.extract(genericRecord, genericRow); assertEqualsDeep(genericRow.getFieldToValueMap(), Map.of("intMV", new Object[0], "stringMV", new Object[0])); @@ -131,9 +132,9 @@ public class AvroRecordToPinotRowGeneratorTest { avroRecordExtractor.init(null, null); GenericRow genericRow = new GenericRow(); - Map<String, Integer> intMap = Map.of("v1", 1, "v2", 2, "v3", 3); + Map<String, Integer> intMap = new HashMap(Map.of("v1", 1, "v2", 2, "v3", 3)); genericRecord.put("intMap", intMap); - Map<String, String> stringMap = Map.of("v1", "value1", "v2", "value2", "v3", "value3"); + Map<String, String> stringMap = new HashMap(Map.of("v1", "value1", "v2", "value2", "v3", "value3")); genericRecord.put("stringMap", stringMap); avroRecordExtractor.extract(genericRecord, genericRow); assertEqualsDeep(genericRow.getFieldToValueMap(), Map.of("intMap", intMap, "stringMap", stringMap)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org