This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new cadd61c Fix extract method in AvroRecordExtractor class (#6005) cadd61c is described below commit cadd61c22ba04b83fca7f205e4c0aaae7bde09bc Author: Jialiang Li <j...@linkedin.com> AuthorDate: Sat Sep 12 15:02:31 2020 -0700 Fix extract method in AvroRecordExtractor class (#6005) * Fix extract method in AvroRecordExtractor Co-authored-by: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> --- .../inputformat/avro/AvroRecordExtractor.java | 15 +++++---- .../inputformat/avro/AvroRecordExtractorTest.java | 37 +++++++++++++++++++++- .../java/org/apache/pinot/spi/utils/JsonUtils.java | 15 --------- 3 files changed, 44 insertions(+), 23 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java index 339ab67..646debc 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java @@ -18,14 +18,14 @@ */ package org.apache.pinot.plugin.inputformat.avro; -import java.util.Map; +import java.util.List; import java.util.Set; import javax.annotation.Nullable; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordExtractor; import org.apache.pinot.spi.data.readers.RecordExtractorConfig; -import org.apache.pinot.spi.utils.JsonUtils; /** @@ -46,13 +46,14 @@ public class AvroRecordExtractor implements RecordExtractor<GenericRecord> { @Override public GenericRow extract(GenericRecord from, GenericRow to) { if (_extractAll) { - Map<String, Object> jsonMap = JsonUtils.genericRecordToJson(from); - jsonMap.forEach((fieldName, value) -> to.putValue(fieldName, AvroUtils.convert(value))); + List<Schema.Field> fields = from.getSchema().getFields(); + for (Schema.Field field : fields) { + String fieldName = field.name(); + to.putValue(fieldName, AvroUtils.convert(from.get(fieldName))); + } } else { for (String fieldName : _fields) { - Object value = from.get(fieldName); - Object convertedValue = AvroUtils.convert(value); - to.putValue(fieldName, convertedValue); + to.putValue(fieldName, AvroUtils.convert(from.get(fieldName))); } } return to; diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java index b985349..833e8fc 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java @@ -18,9 +18,13 @@ */ package org.apache.pinot.plugin.inputformat.avro; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -29,8 +33,13 @@ import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest; +import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; +import org.testng.Assert; +import org.testng.annotations.Test; import static org.apache.avro.Schema.*; @@ -39,7 +48,7 @@ import static org.apache.avro.Schema.*; * Tests the {@link AvroRecordExtractor} using a schema containing groovy transform functions */ public class AvroRecordExtractorTest extends AbstractRecordExtractorTest { - + private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper(); private final File _dataFile = new File(_tempDir, "events.avro"); /** @@ -87,4 +96,30 @@ public class AvroRecordExtractorTest extends AbstractRecordExtractorTest { protected boolean testExtractAll() { return true; } + + @Test + public void testDataTypeReturnFromAvroRecordExtractor() + throws IOException { + String testColumnName = "column1"; + long columnValue = 999999999L; + AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor(); + avroRecordExtractor.init(null, null); + + org.apache.pinot.spi.data.Schema pinotSchema = new org.apache.pinot.spi.data.Schema.SchemaBuilder() + .addSingleValueDimension(testColumnName, FieldSpec.DataType.LONG).build(); + Schema schema = AvroUtils.getAvroSchemaFromPinotSchema(pinotSchema); + GenericRecord genericRecord = new GenericData.Record(schema); + genericRecord.put(testColumnName, columnValue); + GenericRow genericRow = new GenericRow(); + + avroRecordExtractor.extract(genericRecord, genericRow); + Assert.assertEquals(columnValue, genericRow.getValue(testColumnName)); + Assert.assertEquals("Long", genericRow.getValue(testColumnName).getClass().getSimpleName()); + + String jsonString = genericRecord.toString(); + Map<String, Object> jsonMap = DEFAULT_MAPPER.readValue(jsonString, new TypeReference<Map<String, Object>>() { + }); + // The data type got changed to Integer, which will then have to trigger the convert method in DataTypeTransformer class. + Assert.assertEquals("Integer", jsonMap.get(testColumnName).getClass().getSimpleName()); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java index f5bf9d3..a8b206d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java @@ -34,9 +34,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; -import java.util.Map; import javax.annotation.Nullable; -import org.apache.avro.generic.GenericRecord; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -193,17 +191,4 @@ public class JsonUtils { throw new IllegalArgumentException(String.format("Unsupported data type %s", dataType)); } } - - /** - * Converts from a GenericRecord to a json map - */ - public static Map<String, Object> genericRecordToJson(GenericRecord genericRecord) { - try { - String jsonString = genericRecord.toString(); - return DEFAULT_MAPPER.readValue(jsonString, new TypeReference<Map<String, Object>>() { - }); - } catch (IOException e) { - throw new IllegalStateException("Caught exception when converting generic record " + genericRecord + " to JSON"); - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org