This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d54b04a  Deep extraction in Avro and Json RecordExtractor (#5492)
d54b04a is described below

commit d54b04a2562f86dfb3adaa02ff400951d8108738
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Fri Jun 5 18:05:29 2020 -0700

    Deep extraction in Avro and Json RecordExtractor (#5492)
    
    - Json handles Map/Collection/single-values
    - Avro handles Map/Collection/GenericData.Record/single-values
    - Added json_format function to convert json object to json string
---
 .../pinot/common/function/JsonFunctions.java       |   9 +
 ...valuatorTest.java => InbuiltFunctionsTest.java} |  59 +++++-
 .../src/test/resources/data/test_data-mv.avro      | Bin 9583323 -> 7700192 
bytes
 .../inputformat/avro/AvroRecordExtractor.java      |   3 +-
 .../pinot/plugin/inputformat/avro/AvroUtils.java   |  67 ++++---
 .../avro/AvroRecordExtractorComplexTypesTest.java  | 209 +++++++++++++++++++++
 .../avro/AvroRecordExtractorMapTypeTest.java       | 115 ------------
 .../inputformat/json/JSONRecordExtractor.java      |  54 +-----
 ...xtractor.java => JSONRecordExtractorUtils.java} |  90 ++++-----
 .../json/JSONRecordExtractorUtilsTest.java         | 120 ++++++++++++
 .../src/test/resources/data/test_data-mv.avro      | Bin 9583323 -> 7700192 
bytes
 .../pinot/spi/data/readers/RecordExtractor.java    |   3 +
 .../data/readers/AbstractRecordExtractorTest.java  |  47 +++--
 13 files changed, 514 insertions(+), 262 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java
index 1e5cd75..e0d3b06 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java
@@ -46,4 +46,13 @@ public class JsonFunctions {
     return JsonUtils.objectToString(map);
   }
 
+  /**
+   * Convert object to Json String
+   */
+  @ScalarFunction
+  static String json_format(Object object)
+      throws JsonProcessingException {
+    return JsonUtils.objectToString(object);
+  }
+
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionsTest.java
similarity index 77%
rename from 
pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java
rename to 
pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionsTest.java
index 9de5956..9532ea8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionsTest.java
@@ -19,19 +19,21 @@
 package org.apache.pinot.core.data.function;
 
 import com.google.common.collect.Lists;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.pinot.common.function.DateTimeFunctions;
+import java.util.Map;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
 /**
- * Tests the Pinot inbuilt transform functions in {@link DateTimeFunctions} 
which perform date time conversion
+ * Tests the Pinot inbuilt transform functions
  */
-public class DateTimeFunctionEvaluatorTest {
+public class InbuiltFunctionsTest {
 
   @Test(dataProvider = "dateTimeFunctionsTestDataProvider")
   public void testDateTimeTransformFunctions(String transformFunction, 
List<String> arguments, GenericRow row,
@@ -206,4 +208,55 @@ public class DateTimeFunctionEvaluatorTest {
 
     return inputs.toArray(new Object[0][]);
   }
+
+  @Test(dataProvider = "jsonFunctionDataProvider")
+  public void testJsonFunctions(String transformFunction, List<String> 
arguments, GenericRow row, Object result)
+      throws Exception {
+    InbuiltFunctionEvaluator evaluator = new 
InbuiltFunctionEvaluator(transformFunction);
+    Assert.assertEquals(evaluator.getArguments(), arguments);
+    Assert.assertEquals(evaluator.evaluate(row), result);
+  }
+
+  @DataProvider(name = "jsonFunctionDataProvider")
+  public Object[][] jsonFunctionsDataProvider()
+      throws IOException {
+    List<Object[]> inputs = new ArrayList<>();
+
+    // toJsonMapStr
+    GenericRow row0 = new GenericRow();
+    String jsonStr = "{\"k1\":\"foo\",\"k2\":\"bar\"}";
+    row0.putValue("jsonMap", JsonUtils.stringToObject(jsonStr, Map.class));
+    inputs.add(new Object[]{"toJsonMapStr(jsonMap)", 
Lists.newArrayList("jsonMap"), row0, jsonStr});
+
+    GenericRow row1 = new GenericRow();
+    jsonStr = 
"{\"k3\":{\"sub1\":10,\"sub2\":1.0},\"k4\":\"baz\",\"k5\":[1,2,3]}";
+    row1.putValue("jsonMap", JsonUtils
+        .stringToObject(jsonStr, Map.class));
+    inputs.add(new Object[]{"toJsonMapStr(jsonMap)", 
Lists.newArrayList("jsonMap"), row1, jsonStr});
+
+    GenericRow row2 = new GenericRow();
+    jsonStr = "{\"k1\":\"foo\",\"k2\":\"bar\"}";
+    row2.putValue("jsonMap", JsonUtils.stringToObject(jsonStr, Map.class));
+    inputs.add(new Object[]{"json_format(jsonMap)", 
Lists.newArrayList("jsonMap"), row2, jsonStr});
+
+    GenericRow row3 = new GenericRow();
+    jsonStr = 
"{\"k3\":{\"sub1\":10,\"sub2\":1.0},\"k4\":\"baz\",\"k5\":[1,2,3]}";
+    row3.putValue("jsonMap", JsonUtils
+        .stringToObject(jsonStr, Map.class));
+    inputs.add(new Object[]{"json_format(jsonMap)", 
Lists.newArrayList("jsonMap"), row3, jsonStr});
+
+    GenericRow row4 = new GenericRow();
+    jsonStr = "[{\"one\":1,\"two\":\"too\"},{\"one\":11,\"two\":\"roo\"}]";
+    row4.putValue("jsonMap", JsonUtils
+        .stringToObject(jsonStr, List.class));
+    inputs.add(new Object[]{"json_format(jsonMap)", 
Lists.newArrayList("jsonMap"), row4, jsonStr});
+
+    GenericRow row5 = new GenericRow();
+    jsonStr = 
"[{\"one\":1,\"two\":{\"sub1\":1.1,\"sub2\":1.2},\"three\":[\"a\",\"b\"]},{\"one\":11,\"two\":{\"sub1\":11.1,\"sub2\":11.2},\"three\":[\"aa\",\"bb\"]}]";
+    row5.putValue("jsonMap", JsonUtils
+        .stringToObject(jsonStr, List.class));
+    inputs.add(new Object[]{"json_format(jsonMap)", 
Lists.newArrayList("jsonMap"), row5, jsonStr});
+
+    return inputs.toArray(new Object[0][]);
+  }
 }
diff --git a/pinot-core/src/test/resources/data/test_data-mv.avro 
b/pinot-core/src/test/resources/data/test_data-mv.avro
index 151e091..aff4520 100644
Binary files a/pinot-core/src/test/resources/data/test_data-mv.avro and 
b/pinot-core/src/test/resources/data/test_data-mv.avro differ
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index b9ac3cd..339ab67 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -25,7 +25,6 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
 import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
-import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -48,7 +47,7 @@ public class AvroRecordExtractor implements 
RecordExtractor<GenericRecord> {
   public GenericRow extract(GenericRecord from, GenericRow to) {
     if (_extractAll) {
       Map<String, Object> jsonMap = JsonUtils.genericRecordToJson(from);
-      jsonMap.forEach((fieldName, value) -> to.putValue(fieldName, 
RecordReaderUtils.convert(value)));
+      jsonMap.forEach((fieldName, value) -> to.putValue(fieldName, 
AvroUtils.convert(value)));
     } else {
       for (String fieldName : _fields) {
         Object value = from.get(fieldName);
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
index f4ac41c..c089bfc 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -284,59 +285,83 @@ public class AvroUtils {
   }
 
   /**
-   * Converts the value to a single-valued value or a multi-valued value
+   * Converts the value to either a single value (string, number, bytebuffer), 
multi value (Object[]) or a Map
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it 
in the transformation layer
    */
   public static Object convert(Object value) {
+    if (value == null) {
+      return null;
+    }
     Object convertedValue;
     if (value instanceof Collection) {
       convertedValue = handleMultiValue((Collection) value);
     } else if (value instanceof Map) {
       convertedValue = handleMap((Map) value);
+    } else if(value instanceof GenericData.Record) {
+      convertedValue = handleGenericRecord((GenericData.Record) value);
     } else {
-      convertedValue = handleSingleValue(value);
+      convertedValue = RecordReaderUtils.convertSingleValue(value);
     }
     return convertedValue;
   }
 
   /**
-   * Converts the value to a single-valued value by handling instance of 
ByteBuffer, Number and String
+   * Handles the conversion of each value of the Collection.
+   * Converts the Collection to an Object array
    */
-  public static Object handleSingleValue(@Nullable Object value) {
-    if (value == null) {
+  public static Object handleMultiValue(Collection values) {
+
+    if (values.isEmpty()) {
       return null;
     }
-    if (value instanceof GenericData.Record) {
-      return handleSingleValue(((GenericData.Record) value).get(0));
+    int numValues = values.size();
+    Object[] array = new Object[numValues];
+    int index = 0;
+    for (Object value : values) {
+      Object convertedValue = convert(value);
+      if (convertedValue != null && !convertedValue.toString().equals("")) {
+        array[index++] = convertedValue;
+      }
+    }
+    if (index == numValues) {
+      return array;
+    } else if (index == 0) {
+      return null;
+    } else {
+      return Arrays.copyOf(array, index);
     }
-    return RecordReaderUtils.convertSingleValue(value);
   }
 
   /**
-   * Converts the value to a multi-valued column
+   * Handles the conversion of every value of the Map
    */
-  public static Object handleMultiValue(@Nullable Collection values) {
-    if (values == null || values.isEmpty()) {
+  public static Object handleMap(Map map) {
+    if (map.isEmpty()) {
       return null;
     }
-    int numValues = values.size();
-    List<Object> list = new ArrayList<>(numValues);
-    for (Object value : values) {
-      list.add(handleSingleValue(value));
+
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      convertedMap.put(RecordReaderUtils.convertSingleValue(key), 
convert(map.get(key)));
     }
-    return RecordReaderUtils.convertMultiValue(list);
+    return convertedMap;
   }
 
   /**
-   * Converts the values within the map to single-valued values
+   * Handles the conversion of every field of the GenericRecord
    */
-  public static Object handleMap(@Nullable Map map) {
-    if (map == null || map.isEmpty()) {
+  private static Object handleGenericRecord(GenericData.Record record) {
+    List<Field> fields = record.getSchema().getFields();
+    if (fields.isEmpty()) {
       return null;
     }
 
     Map<Object, Object> convertedMap = new HashMap<>();
-    for (Object key : map.keySet()) {
-      convertedMap.put(RecordReaderUtils.convertSingleValue(key), 
RecordReaderUtils.convertSingleValue(map.get(key)));
+    for (Field field : fields) {
+      String fieldName = field.name();
+      convertedMap.put(fieldName, convert(record.get(fieldName)));
     }
     return convertedMap;
   }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorComplexTypesTest.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorComplexTypesTest.java
new file mode 100644
index 0000000..d7584ed
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorComplexTypesTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.avro;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
+
+import static org.apache.avro.Schema.*;
+
+
+/**
+ * Tests the {@link AvroRecordExtractor} for Avro Map and Avro Record
+ */
+public class AvroRecordExtractorComplexTypesTest extends 
AbstractRecordExtractorTest {
+
+  private final File _dataFile = new File(_tempDir, "complex.avro");
+  Schema avroSchema;
+  Schema intStringMapAvroSchema;
+  Schema stringIntMapAvroSchema;
+  Schema simpleRecordSchema;
+  Schema complexRecordSchema;
+  Schema complexFieldSchema;
+  Schema complexListSchema;
+
+  @Override
+  protected List<Map<String, Object>> getInputRecords() {
+
+    // map with int keys
+    intStringMapAvroSchema = createMap(create(Type.STRING));
+
+    // map with string keys
+    stringIntMapAvroSchema = createMap(create(Type.INT));
+
+    // simple record - contains a string, long and double array
+    simpleRecordSchema = createRecord("simpleRecord", null, null, false);
+    simpleRecordSchema.setFields(Lists.newArrayList(new Field("simpleField1", 
create(Type.STRING), null, null),
+        new Field("simpleField2", create(Type.LONG), null, null),
+        new Field("simpleList", createArray(create(Type.DOUBLE)), null, 
null)));
+
+    // complex record - contains a string, a complex field (contains int and 
long)
+    complexRecordSchema = createRecord("complexRecord", null, null, false);
+    complexFieldSchema = createRecord("complexField", null, null, false);
+    complexFieldSchema.setFields(Lists.newArrayList(new Field("field1", 
create(Type.INT), null, null),
+        new Field("field2", create(Type.LONG), null, null)));
+    complexRecordSchema.setFields(Lists.newArrayList(new Field("simpleField", 
create(Type.STRING), null, null),
+        new Field("complexField", complexFieldSchema, null, null)));
+
+    // complex list element - each element contains a record of int and long
+    complexListSchema = createRecord("complexList", null, null, false);
+    complexListSchema.setFields(Lists.newArrayList(new Field("field1", 
create(Type.INT), null, null),
+        new Field("field2", create(Type.LONG), null, null)));
+
+    Field map1Field = new Field("map1", intStringMapAvroSchema, null, null);
+    Field map2Field = new Field("map2", stringIntMapAvroSchema, null, null);
+    Field simpleRecordField = new Field("simpleRecord", simpleRecordSchema, 
null, null);
+    Field complexRecordField = new Field("complexRecord", complexRecordSchema, 
null, null);
+    Field complexListField = new Field("complexList", 
createArray(complexListSchema), null, null);
+
+    avroSchema = createRecord("manyComplexTypes", null, null, false);
+    avroSchema
+        .setFields(Lists.newArrayList(map1Field, map2Field, simpleRecordField, 
complexRecordField, complexListField));
+
+    List<Map<String, Object>> inputRecords = new ArrayList<>(2);
+    inputRecords.add(getRecord1());
+    inputRecords.add(getRecord2());
+    return inputRecords;
+  }
+
+  private Map<String, Object> getRecord1() {
+    Map<String, Object> record1 = new HashMap<>();
+
+    Map<Integer, String> map1 = new HashMap<>();
+    map1.put(30, "foo");
+    map1.put(200, "bar");
+    record1.put("map1", map1);
+
+    Map<String, Integer> map2 = new HashMap<>();
+    map2.put("k1", 10000);
+    map2.put("k2", 20000);
+    record1.put("map2", map2);
+
+    GenericRecord simpleRecord = new GenericData.Record(simpleRecordSchema);
+    simpleRecord.put("simpleField1", "foo");
+    simpleRecord.put("simpleField2", 1588469340000L);
+    simpleRecord.put("simpleList", Arrays.asList(1.1, 2.2));
+    record1.put("simpleRecord", simpleRecord);
+
+    GenericRecord complexRecord = new GenericData.Record(complexRecordSchema);
+    GenericRecord subComplexRecord = new 
GenericData.Record(complexFieldSchema);
+    subComplexRecord.put("field1", 100);
+    subComplexRecord.put("field2", 1588469340000L);
+    complexRecord.put("simpleField", "foo");
+    complexRecord.put("complexField", subComplexRecord);
+    record1.put("complexRecord", complexRecord);
+
+    GenericRecord listElem1 = new GenericData.Record(complexListSchema);
+    listElem1.put("field1", 20);
+    listElem1.put("field2", 2000200020002000L);
+    GenericRecord listElem2 = new GenericData.Record(complexListSchema);
+    listElem2.put("field1", 280);
+    listElem2.put("field2", 8000200020002000L);
+    record1.put("complexList", Arrays.asList(listElem1, listElem2));
+
+    return record1;
+  }
+
+  private Map<String, Object> getRecord2() {
+    Map<String, Object> record2 = new HashMap<>();
+    Map<Integer, String> map1 = new HashMap<>();
+    map1.put(30, "moo");
+    map1.put(200, "baz");
+    record2.put("map1", map1);
+
+    Map<String, Integer> map2 = new HashMap<>();
+    map2.put("k1", 100);
+    map2.put("k2", 200);
+    record2.put("map2", map2);
+
+    GenericRecord simpleRecord2 = new GenericData.Record(simpleRecordSchema);
+    simpleRecord2.put("simpleField1", "foo");
+    simpleRecord2.put("simpleField2", 1588469340000L);
+    simpleRecord2.put("simpleList", Arrays.asList(1.1, 2.2));
+    record2.put("simpleRecord", simpleRecord2);
+
+    GenericRecord complexRecord2 = new GenericData.Record(complexRecordSchema);
+    GenericRecord subComplexRecord2 = new 
GenericData.Record(complexFieldSchema);
+    subComplexRecord2.put("field1", 100);
+    subComplexRecord2.put("field2", 1588469340000L);
+    complexRecord2.put("simpleField", "foo");
+    complexRecord2.put("complexField", subComplexRecord2);
+    record2.put("complexRecord", complexRecord2);
+
+    GenericRecord listElem12 = new GenericData.Record(complexListSchema);
+    listElem12.put("field1", 20);
+    listElem12.put("field2", 2000200020002000L);
+    GenericRecord listElem22 = new GenericData.Record(complexListSchema);
+    listElem22.put("field1", 280);
+    listElem22.put("field2", 8000200020002000L);
+    record2.put("complexList", Arrays.asList(listElem12, listElem22));
+
+    return record2;
+  }
+
+  @Override
+  protected Set<String> getSourceFields() {
+    return Sets.newHashSet("map1", "map2", "simpleRecord", "complexRecord", 
"complexList");
+  }
+
+  /**
+   * Create an AvroRecordReader
+   */
+  @Override
+  protected RecordReader createRecordReader(Set<String> fieldsToRead)
+      throws IOException {
+    AvroRecordReader avroRecordReader = new AvroRecordReader();
+    avroRecordReader.init(_dataFile, fieldsToRead, null);
+    return avroRecordReader;
+  }
+
+  /**
+   * Create an Avro input file using the input records containing maps and 
record
+   */
+  @Override
+  protected void createInputFile()
+      throws IOException {
+
+    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+      fileWriter.create(avroSchema, _dataFile);
+      for (Map<String, Object> inputRecord : _inputRecords) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        for (String columnName : _sourceFieldNames) {
+          record.put(columnName, inputRecord.get(columnName));
+        }
+        fileWriter.append(record);
+      }
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
deleted file mode 100644
index 552c011..0000000
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.inputformat.avro;
-
-import com.google.common.collect.Sets;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
-import org.apache.pinot.spi.data.readers.RecordReader;
-
-import static org.apache.avro.Schema.*;
-
-
-/**
- * Tests the {@link AvroRecordExtractor} using a schema containing groovy 
transform functions for Avro maps
- */
-public class AvroRecordExtractorMapTypeTest extends 
AbstractRecordExtractorTest {
-
-  private final File _dataFile = new File(_tempDir, "maps.avro");
-
-  @Override
-  protected List<Map<String, Object>> getInputRecords() {
-    List<Map<String, Object>> inputRecords = new ArrayList<>(2);
-    Map<String, Object> record1 = new HashMap<>();
-    Map<Integer, String> map1 = new HashMap<>();
-    map1.put(30, "foo");
-    map1.put(200, "bar");
-    Map<String, Integer> map2 = new HashMap<>();
-    map2.put("k1", 10000);
-    map2.put("k2", 20000);
-    record1.put("map1", map1);
-    record1.put("map2", map2);
-    inputRecords.add(record1);
-
-    Map<String, Object> record2 = new HashMap<>();
-    map1 = new HashMap<>();
-    map1.put(30, "moo");
-    map1.put(200, "baz");
-    map2 = new HashMap<>();
-    map2.put("k1", 100);
-    map2.put("k2", 200);
-    record2.put("map1", map1);
-    record2.put("map2", map2);
-    inputRecords.add(record2);
-
-    return inputRecords;
-  }
-
-  @Override
-  protected Set<String> getSourceFields() {
-    return Sets.newHashSet("map1", "map2");
-  }
-
-  /**
-   * Create an AvroRecordReader
-   */
-  @Override
-  protected RecordReader createRecordReader(Set<String> fieldsToRead)
-      throws IOException {
-    AvroRecordReader avroRecordReader = new AvroRecordReader();
-    avroRecordReader.init(_dataFile, fieldsToRead, null);
-    return avroRecordReader;
-  }
-
-  /**
-   * Create an Avro input file using the input records containing maps
-   */
-  @Override
-  protected void createInputFile()
-      throws IOException {
-    org.apache.avro.Schema avroSchema = createRecord("mapRecord", null, null, 
false);
-    org.apache.avro.Schema intStringMapAvroSchema = 
createMap(create(Type.STRING));
-    org.apache.avro.Schema stringIntMapAvroSchema = 
createMap(create(Type.INT));
-    List<Field> fields = Arrays.asList(new Field("map1", 
intStringMapAvroSchema, null, null),
-        new Field("map2", stringIntMapAvroSchema, null, null));
-    avroSchema.setFields(fields);
-
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, _dataFile);
-      for (Map<String, Object> inputRecord : _inputRecords) {
-        GenericData.Record record = new GenericData.Record(avroSchema);
-        for (String columnName : _sourceFieldNames) {
-          record.put(columnName, inputRecord.get(columnName));
-        }
-        fileWriter.append(record);
-      }
-    }
-  }
-}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
index a806a8d..678265d 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pinot.plugin.inputformat.json;
 
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
@@ -47,66 +45,18 @@ public class JSONRecordExtractor implements 
RecordExtractor<Map<String, Object>>
   @Override
   public GenericRow extract(Map<String, Object> from, GenericRow to) {
     if (_extractAll) {
-      from.forEach((fieldName, value) -> to.putValue(fieldName, 
convertValue(value)));
+      from.forEach((fieldName, value) -> to.putValue(fieldName, 
JSONRecordExtractorUtils.convertValue(value)));
     } else {
       for (String fieldName : _fields) {
         Object value = from.get(fieldName);
         // NOTE about JSON behavior - cannot distinguish between INT/LONG and 
FLOAT/DOUBLE.
         // DataTypeTransformer fixes it.
-        Object convertedValue = convertValue(value);
+        Object convertedValue = JSONRecordExtractorUtils.convertValue(value);
         to.putValue(fieldName, convertedValue);
       }
     }
     return to;
   }
 
-  private Object convertValue(Object value) {
-    Object convertedValue;
-    if (value instanceof Collection) {
-      convertedValue = convertMultiValue((Collection) value);
-    } else {
-      convertedValue = convertSingleValue(value);
-    }
-    return convertedValue;
-  }
-
-  /**
-   * Converts the value to a single-valued value
-   */
-  @Nullable
-  private Object convertSingleValue(@Nullable Object value) {
-    if (value == null) {
-      return null;
-    }
-    if (value instanceof Number) {
-      return value;
-    }
-    return value.toString();
-  }
 
-  /**
-   * Converts the value to a multi-valued value
-   */
-  @Nullable
-  private Object convertMultiValue(@Nullable Collection values) {
-    if (values == null || values.isEmpty()) {
-      return null;
-    }
-    int numValues = values.size();
-    Object[] array = new Object[numValues];
-    int index = 0;
-    for (Object value : values) {
-      Object convertedValue = convertSingleValue(value);
-      if (convertedValue != null && !convertedValue.toString().equals("")) {
-        array[index++] = convertedValue;
-      }
-    }
-    if (index == numValues) {
-      return array;
-    } else if (index == 0) {
-      return null;
-    } else {
-      return Arrays.copyOf(array, index);
-    }
-  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtils.java
similarity index 51%
copy from 
pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
copy to 
pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtils.java
index a806a8d..47e7927 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtils.java
@@ -20,83 +20,56 @@ package org.apache.pinot.plugin.inputformat.json;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordExtractor;
-import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
 
 
 /**
- * Extractor for JSON records
+ * Helper methods for converting values from json nodes to
  */
-public class JSONRecordExtractor implements RecordExtractor<Map<String, 
Object>> {
+public final class JSONRecordExtractorUtils {
 
-  private Set<String> _fields;
-  private boolean _extractAll = false;
+  private JSONRecordExtractorUtils() {}
 
-  @Override
-  public void init(Set<String> fields, @Nullable RecordExtractorConfig 
recordExtractorConfig) {
-    _fields = fields;
-    if (fields == null || fields.isEmpty()) {
-      _extractAll = true;
-    }
-  }
-
-  @Override
-  public GenericRow extract(Map<String, Object> from, GenericRow to) {
-    if (_extractAll) {
-      from.forEach((fieldName, value) -> to.putValue(fieldName, 
convertValue(value)));
-    } else {
-      for (String fieldName : _fields) {
-        Object value = from.get(fieldName);
-        // NOTE about JSON behavior - cannot distinguish between INT/LONG and 
FLOAT/DOUBLE.
-        // DataTypeTransformer fixes it.
-        Object convertedValue = convertValue(value);
-        to.putValue(fieldName, convertedValue);
-      }
+  /**
+   * Converts the value to either a single value (string, number), a multi 
value (Object[]) or a Map
+   *
+   * Natively Pinot only understands single values and multi values.
+   * Map is useful only if some ingestion transform functions operates on it 
in the transformation layer
+   */
+  public static Object convertValue(Object value) {
+    if (value == null) {
+      return null;
     }
-    return to;
-  }
-
-  private Object convertValue(Object value) {
     Object convertedValue;
     if (value instanceof Collection) {
       convertedValue = convertMultiValue((Collection) value);
+    } else if (value instanceof Map) {
+      convertedValue = convertMap((Map) value);
     } else {
-      convertedValue = convertSingleValue(value);
+      if (value instanceof Number) {
+        convertedValue = value;
+      } else {
+        convertedValue = value.toString();
+      }
     }
     return convertedValue;
   }
 
   /**
-   * Converts the value to a single-valued value
+   * Applies conversion to each element of the collection
    */
   @Nullable
-  private Object convertSingleValue(@Nullable Object value) {
-    if (value == null) {
-      return null;
-    }
-    if (value instanceof Number) {
-      return value;
-    }
-    return value.toString();
-  }
-
-  /**
-   * Converts the value to a multi-valued value
-   */
-  @Nullable
-  private Object convertMultiValue(@Nullable Collection values) {
-    if (values == null || values.isEmpty()) {
+  private static Object convertMultiValue(Collection values) {
+    if (values.isEmpty()) {
       return null;
     }
     int numValues = values.size();
     Object[] array = new Object[numValues];
     int index = 0;
     for (Object value : values) {
-      Object convertedValue = convertSingleValue(value);
+      Object convertedValue = convertValue(value);
       if (convertedValue != null && !convertedValue.toString().equals("")) {
         array[index++] = convertedValue;
       }
@@ -109,4 +82,19 @@ public class JSONRecordExtractor implements 
RecordExtractor<Map<String, Object>>
       return Arrays.copyOf(array, index);
     }
   }
+
+  /**
+   * Applies the conversion to each value of the map
+   */
+  @Nullable
+  private static Object convertMap(Map map) {
+    if (map.isEmpty()) {
+      return null;
+    }
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Object key : map.keySet()) {
+      convertedMap.put(key, convertValue(map.get(key)));
+    }
+    return convertedMap;
+  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtilsTest.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtilsTest.java
new file mode 100644
index 0000000..8821aed
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtilsTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+
+public class JSONRecordExtractorUtilsTest {
+
+  @Test(dataProvider = "conversionTestData")
+  public void testConversion(Object value, Object expectedConvertedValue)
+      throws JsonProcessingException {
+    Object convertedValue = JSONRecordExtractorUtils.convertValue(value);
+    Assert.assertEquals(JsonUtils.objectToString(convertedValue), 
JsonUtils.objectToString(expectedConvertedValue));
+  }
+
+  @DataProvider(name = "conversionTestData")
+  public Object[][] getConversionTestData()
+      throws IOException {
+    List<Object[]> input = new ArrayList<>();
+
+    String jsonString = "{\n"
+        + "  \"myInt\": 10,\n"
+        + "  \"myLong\": 1588469340000,\n"
+        + "  \"myDouble\": 10.2,\n"
+        + "  \"myNull\": null,\n"
+        + "  \"myString\": \"foo\",\n"
+        + "  \"myIntArray\": [10, 20, 30],\n"
+        + "  \"myStringArray\": [\"foo\", null, \"bar\"],\n"
+        + "  \"myDoubleArray\": [10.2, 12.1, 1.1],\n"
+        + "  \"myComplexArray1\": [{\"one\": 1, \"two\": \"too\"}, {\"one\": 
11, \"two\": \"roo\"}],\n"
+        + "  \"myComplexArray2\": [{\"one\":1, \"two\": {\"sub1\":1.1, 
\"sub2\": 1.2}, \"three\":[\"a\", \"b\"]}, {\"one\":11, \"two\": 
{\"sub1\":11.1, \"sub2\": 11.2}, \"three\":[\"aa\", \"bb\"]}],\n"
+        + "  \"myMap1\": {\"k1\": \"foo\", \"k2\": \"bar\"},\n"
+        + "  \"myMap2\": {\"k3\": {\"sub1\": 10, \"sub2\": 1.0}, \"k4\": 
\"baz\", \"k5\": [1,2,3]}\n" + "}";
+    Map<String, Object> jsonNode = new ObjectMapper().readValue(jsonString, 
Map.class);
+    input.add(new Object[]{jsonNode.get("myNull"), null});
+
+    input.add(new Object[]{jsonNode.get("myInt"), 10});
+
+    input.add(new Object[]{jsonNode.get("myLong"), 1588469340000L});
+
+    input.add(new Object[]{jsonNode.get("myDouble"), 10.2});
+
+    input.add(new Object[]{jsonNode.get("myString"), "foo"});
+
+    input.add(new Object[]{jsonNode.get("myIntArray"), new Object[]{10, 20, 
30}});
+
+    input.add(new Object[]{jsonNode.get("myDoubleArray"), new Object[]{10.2, 
12.1, 1.1}});
+
+    input.add(new Object[]{jsonNode.get("myStringArray"), new Object[]{"foo", 
"bar"}});
+
+    Map<String, Object> map1 = new HashMap<>();
+    map1.put("one", 1);
+    map1.put("two", "too");
+    Map<String, Object> map2 = new HashMap<>();
+    map2.put("one", 11);
+    map2.put("two", "roo");
+    input.add(new Object[]{jsonNode.get("myComplexArray1"), new Object[]{map1, 
map2}});
+
+    Map<String, Object> map3 = new HashMap<>();
+    map3.put("one", 1);
+    Map<String, Object> map31 = new HashMap<>();
+    map31.put("sub1", 1.1);
+    map31.put("sub2", 1.2);
+    map3.put("two", map31);
+    map3.put("three", new Object[]{"a", "b"});
+    Map<String, Object> map4 = new HashMap<>();
+    map4.put("one", 11);
+    Map<String, Object> map41 = new HashMap<>();
+    map41.put("sub1", 11.1);
+    map41.put("sub2", 11.2);
+    map4.put("two", map41);
+    map4.put("three", new Object[]{"aa", "bb"});
+    input.add(new Object[]{jsonNode.get("myComplexArray2"), new Object[]{map3, 
map4}});
+
+    Map<String, Object> map5 = new HashMap<>();
+    map5.put("k1", "foo");
+    map5.put("k2", "bar");
+    input.add(new Object[]{jsonNode.get("myMap1"), map5});
+
+    Map<String, Object> map6 = new HashMap<>();
+    Map<String, Object> map61 = new HashMap<>();
+    map61.put("sub1", 10);
+    map61.put("sub2", 1.0);
+    map6.put("k3", map61);
+    map6.put("k4", "baz");
+    map6.put("k5", new Object[]{1, 2, 3});
+    input.add(new Object[]{jsonNode.get("myMap2"), map6});
+
+    return input.toArray(new Object[0][]);
+  }
+
+}
\ No newline at end of file
diff --git a/pinot-server/src/test/resources/data/test_data-mv.avro 
b/pinot-server/src/test/resources/data/test_data-mv.avro
index 151e091..aff4520 100644
Binary files a/pinot-server/src/test/resources/data/test_data-mv.avro and 
b/pinot-server/src/test/resources/data/test_data-mv.avro differ
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
index 36c919f..f1fc762 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
@@ -23,6 +23,9 @@ import java.util.Set;
 
 /**
  * Extracts fields from input records
+ * 1) Number/String/ByteBuffer become single-value column
+ * 2) Collections become Object[] i.e. multi-value column
+ * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become 
Map<Object, Object>
  * @param <T> The format of the input record
  */
 public interface RecordExtractor<T> {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
index e3a1f4a..052209a 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.avro.generic.GenericData;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.data.Schema;
 import org.testng.Assert;
@@ -114,25 +115,35 @@ public abstract class AbstractRecordExtractorTest {
       String columnName = entry.getKey();
       Object expectedValue = entry.getValue();
       Object actualValue = genericRow.getValue(columnName);
-      if (expectedValue instanceof Collection) {
-        List actualArray =
-            actualValue instanceof List ? (ArrayList) actualValue : 
Arrays.asList((Object[]) actualValue);
-        List expectedArray = (List) expectedValue;
-        for (int j = 0; j < actualArray.size(); j++) {
-          Assert.assertEquals(actualArray.get(j), expectedArray.get(j));
-        }
-      } else if (expectedValue instanceof Map) {
-        Map<Object, Object> actualMap = (HashMap) actualValue;
-        Map<Object, Object> expectedMap = (HashMap) expectedValue;
-        for (Map.Entry<Object, Object> mapEntry : expectedMap.entrySet()) {
-          Assert.assertEquals(actualMap.get(mapEntry.getKey().toString()), 
mapEntry.getValue());
-        }
+      checkValue(expectedValue, actualValue);
+    }
+  }
+
+  private void checkValue(Object expectedValue, Object actualValue) {
+    if (expectedValue instanceof Collection) {
+      List actualArray =
+          actualValue instanceof List ? (ArrayList) actualValue : 
Arrays.asList((Object[]) actualValue);
+      List expectedArray = (List) expectedValue;
+      for (int j = 0; j < actualArray.size(); j++) {
+        checkValue(expectedArray.get(j), actualArray.get(j));
+      }
+    } else if (expectedValue instanceof Map) {
+      Map<Object, Object> actualMap = (HashMap) actualValue;
+      Map<Object, Object> expectedMap = (HashMap) expectedValue;
+      for (Map.Entry<Object, Object> mapEntry : expectedMap.entrySet()) {
+        Assert.assertEquals(actualMap.get(mapEntry.getKey().toString()), 
mapEntry.getValue());
+      }
+    } else if (expectedValue instanceof GenericData.Record) {
+      Map<Object, Object> actualMap = (HashMap) actualValue;
+      GenericData.Record expectedGenericRecord = (GenericData.Record) 
expectedValue;
+      for (Map.Entry<Object, Object> mapEntry : actualMap.entrySet()) {
+        checkValue(expectedGenericRecord.get(mapEntry.getKey().toString()), 
mapEntry.getValue());
+      }
+    } else {
+      if (expectedValue != null) {
+        Assert.assertEquals(actualValue, expectedValue);
       } else {
-        if (expectedValue != null) {
-          Assert.assertEquals(actualValue, expectedValue);
-        } else {
-          Assert.assertNull(actualValue);
-        }
+        Assert.assertNull(actualValue);
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to