This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new d54b04a Deep extraction in Avro and Json RecordExtractor (#5492) d54b04a is described below commit d54b04a2562f86dfb3adaa02ff400951d8108738 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Fri Jun 5 18:05:29 2020 -0700 Deep extraction in Avro and Json RecordExtractor (#5492) - Json handles Map/Collection/single-values - Avro handles Map/Collection/GenericData.Record/single-values - Added json_format function to convert json object to json string --- .../pinot/common/function/JsonFunctions.java | 9 + ...valuatorTest.java => InbuiltFunctionsTest.java} | 59 +++++- .../src/test/resources/data/test_data-mv.avro | Bin 9583323 -> 7700192 bytes .../inputformat/avro/AvroRecordExtractor.java | 3 +- .../pinot/plugin/inputformat/avro/AvroUtils.java | 67 ++++--- .../avro/AvroRecordExtractorComplexTypesTest.java | 209 +++++++++++++++++++++ .../avro/AvroRecordExtractorMapTypeTest.java | 115 ------------ .../inputformat/json/JSONRecordExtractor.java | 54 +----- ...xtractor.java => JSONRecordExtractorUtils.java} | 90 ++++----- .../json/JSONRecordExtractorUtilsTest.java | 120 ++++++++++++ .../src/test/resources/data/test_data-mv.avro | Bin 9583323 -> 7700192 bytes .../pinot/spi/data/readers/RecordExtractor.java | 3 + .../data/readers/AbstractRecordExtractorTest.java | 47 +++-- 13 files changed, 514 insertions(+), 262 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java index 1e5cd75..e0d3b06 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/JsonFunctions.java @@ -46,4 +46,13 @@ public class JsonFunctions { return JsonUtils.objectToString(map); } + /** + * Convert object to Json String + */ + @ScalarFunction + static String json_format(Object object) + throws JsonProcessingException { + return JsonUtils.objectToString(object); + } + } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionsTest.java similarity index 77% rename from pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java rename to pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionsTest.java index 9de5956..9532ea8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionsTest.java @@ -19,19 +19,21 @@ package org.apache.pinot.core.data.function; import com.google.common.collect.Lists; +import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.pinot.common.function.DateTimeFunctions; +import java.util.Map; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.JsonUtils; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** - * Tests the Pinot inbuilt transform functions in {@link DateTimeFunctions} which perform date time conversion + * Tests the Pinot inbuilt transform functions */ -public class DateTimeFunctionEvaluatorTest { +public class InbuiltFunctionsTest { @Test(dataProvider = "dateTimeFunctionsTestDataProvider") public void testDateTimeTransformFunctions(String transformFunction, List<String> arguments, GenericRow row, @@ -206,4 +208,55 @@ public class DateTimeFunctionEvaluatorTest { return inputs.toArray(new Object[0][]); } + + @Test(dataProvider = "jsonFunctionDataProvider") + public void testJsonFunctions(String transformFunction, List<String> arguments, GenericRow row, Object result) + throws Exception { + InbuiltFunctionEvaluator evaluator = new InbuiltFunctionEvaluator(transformFunction); + Assert.assertEquals(evaluator.getArguments(), arguments); + Assert.assertEquals(evaluator.evaluate(row), result); + } + + @DataProvider(name = "jsonFunctionDataProvider") + public Object[][] jsonFunctionsDataProvider() + throws IOException { + List<Object[]> inputs = new ArrayList<>(); + + // toJsonMapStr + GenericRow row0 = new GenericRow(); + String jsonStr = "{\"k1\":\"foo\",\"k2\":\"bar\"}"; + row0.putValue("jsonMap", JsonUtils.stringToObject(jsonStr, Map.class)); + inputs.add(new Object[]{"toJsonMapStr(jsonMap)", Lists.newArrayList("jsonMap"), row0, jsonStr}); + + GenericRow row1 = new GenericRow(); + jsonStr = "{\"k3\":{\"sub1\":10,\"sub2\":1.0},\"k4\":\"baz\",\"k5\":[1,2,3]}"; + row1.putValue("jsonMap", JsonUtils + .stringToObject(jsonStr, Map.class)); + inputs.add(new Object[]{"toJsonMapStr(jsonMap)", Lists.newArrayList("jsonMap"), row1, jsonStr}); + + GenericRow row2 = new GenericRow(); + jsonStr = "{\"k1\":\"foo\",\"k2\":\"bar\"}"; + row2.putValue("jsonMap", JsonUtils.stringToObject(jsonStr, Map.class)); + inputs.add(new Object[]{"json_format(jsonMap)", Lists.newArrayList("jsonMap"), row2, jsonStr}); + + GenericRow row3 = new GenericRow(); + jsonStr = "{\"k3\":{\"sub1\":10,\"sub2\":1.0},\"k4\":\"baz\",\"k5\":[1,2,3]}"; + row3.putValue("jsonMap", JsonUtils + .stringToObject(jsonStr, Map.class)); + inputs.add(new Object[]{"json_format(jsonMap)", Lists.newArrayList("jsonMap"), row3, jsonStr}); + + GenericRow row4 = new GenericRow(); + jsonStr = "[{\"one\":1,\"two\":\"too\"},{\"one\":11,\"two\":\"roo\"}]"; + row4.putValue("jsonMap", JsonUtils + .stringToObject(jsonStr, List.class)); + inputs.add(new Object[]{"json_format(jsonMap)", Lists.newArrayList("jsonMap"), row4, jsonStr}); + + GenericRow row5 = new GenericRow(); + jsonStr = "[{\"one\":1,\"two\":{\"sub1\":1.1,\"sub2\":1.2},\"three\":[\"a\",\"b\"]},{\"one\":11,\"two\":{\"sub1\":11.1,\"sub2\":11.2},\"three\":[\"aa\",\"bb\"]}]"; + row5.putValue("jsonMap", JsonUtils + .stringToObject(jsonStr, List.class)); + inputs.add(new Object[]{"json_format(jsonMap)", Lists.newArrayList("jsonMap"), row5, jsonStr}); + + return inputs.toArray(new Object[0][]); + } } diff --git a/pinot-core/src/test/resources/data/test_data-mv.avro b/pinot-core/src/test/resources/data/test_data-mv.avro index 151e091..aff4520 100644 Binary files a/pinot-core/src/test/resources/data/test_data-mv.avro and b/pinot-core/src/test/resources/data/test_data-mv.avro differ diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java index b9ac3cd..339ab67 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java @@ -25,7 +25,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordExtractor; import org.apache.pinot.spi.data.readers.RecordExtractorConfig; -import org.apache.pinot.spi.data.readers.RecordReaderUtils; import org.apache.pinot.spi.utils.JsonUtils; @@ -48,7 +47,7 @@ public class AvroRecordExtractor implements RecordExtractor<GenericRecord> { public GenericRow extract(GenericRecord from, GenericRow to) { if (_extractAll) { Map<String, Object> jsonMap = JsonUtils.genericRecordToJson(from); - jsonMap.forEach((fieldName, value) -> to.putValue(fieldName, RecordReaderUtils.convert(value))); + jsonMap.forEach((fieldName, value) -> to.putValue(fieldName, AvroUtils.convert(value))); } else { for (String fieldName : _fields) { Object value = from.get(fieldName); diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java index f4ac41c..c089bfc 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -284,59 +285,83 @@ public class AvroUtils { } /** - * Converts the value to a single-valued value or a multi-valued value + * Converts the value to either a single value (string, number, bytebuffer), multi value (Object[]) or a Map + * + * Natively Pinot only understands single values and multi values. + * Map is useful only if some ingestion transform functions operates on it in the transformation layer */ public static Object convert(Object value) { + if (value == null) { + return null; + } Object convertedValue; if (value instanceof Collection) { convertedValue = handleMultiValue((Collection) value); } else if (value instanceof Map) { convertedValue = handleMap((Map) value); + } else if(value instanceof GenericData.Record) { + convertedValue = handleGenericRecord((GenericData.Record) value); } else { - convertedValue = handleSingleValue(value); + convertedValue = RecordReaderUtils.convertSingleValue(value); } return convertedValue; } /** - * Converts the value to a single-valued value by handling instance of ByteBuffer, Number and String + * Handles the conversion of each value of the Collection. + * Converts the Collection to an Object array */ - public static Object handleSingleValue(@Nullable Object value) { - if (value == null) { + public static Object handleMultiValue(Collection values) { + + if (values.isEmpty()) { return null; } - if (value instanceof GenericData.Record) { - return handleSingleValue(((GenericData.Record) value).get(0)); + int numValues = values.size(); + Object[] array = new Object[numValues]; + int index = 0; + for (Object value : values) { + Object convertedValue = convert(value); + if (convertedValue != null && !convertedValue.toString().equals("")) { + array[index++] = convertedValue; + } + } + if (index == numValues) { + return array; + } else if (index == 0) { + return null; + } else { + return Arrays.copyOf(array, index); } - return RecordReaderUtils.convertSingleValue(value); } /** - * Converts the value to a multi-valued column + * Handles the conversion of every value of the Map */ - public static Object handleMultiValue(@Nullable Collection values) { - if (values == null || values.isEmpty()) { + public static Object handleMap(Map map) { + if (map.isEmpty()) { return null; } - int numValues = values.size(); - List<Object> list = new ArrayList<>(numValues); - for (Object value : values) { - list.add(handleSingleValue(value)); + + Map<Object, Object> convertedMap = new HashMap<>(); + for (Object key : map.keySet()) { + convertedMap.put(RecordReaderUtils.convertSingleValue(key), convert(map.get(key))); } - return RecordReaderUtils.convertMultiValue(list); + return convertedMap; } /** - * Converts the values within the map to single-valued values + * Handles the conversion of every field of the GenericRecord */ - public static Object handleMap(@Nullable Map map) { - if (map == null || map.isEmpty()) { + private static Object handleGenericRecord(GenericData.Record record) { + List<Field> fields = record.getSchema().getFields(); + if (fields.isEmpty()) { return null; } Map<Object, Object> convertedMap = new HashMap<>(); - for (Object key : map.keySet()) { - convertedMap.put(RecordReaderUtils.convertSingleValue(key), RecordReaderUtils.convertSingleValue(map.get(key))); + for (Field field : fields) { + String fieldName = field.name(); + convertedMap.put(fieldName, convert(record.get(fieldName))); } return convertedMap; } diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorComplexTypesTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorComplexTypesTest.java new file mode 100644 index 0000000..d7584ed --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorComplexTypesTest.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.avro; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest; +import org.apache.pinot.spi.data.readers.RecordReader; + +import static org.apache.avro.Schema.*; + + +/** + * Tests the {@link AvroRecordExtractor} for Avro Map and Avro Record + */ +public class AvroRecordExtractorComplexTypesTest extends AbstractRecordExtractorTest { + + private final File _dataFile = new File(_tempDir, "complex.avro"); + Schema avroSchema; + Schema intStringMapAvroSchema; + Schema stringIntMapAvroSchema; + Schema simpleRecordSchema; + Schema complexRecordSchema; + Schema complexFieldSchema; + Schema complexListSchema; + + @Override + protected List<Map<String, Object>> getInputRecords() { + + // map with int keys + intStringMapAvroSchema = createMap(create(Type.STRING)); + + // map with string keys + stringIntMapAvroSchema = createMap(create(Type.INT)); + + // simple record - contains a string, long and double array + simpleRecordSchema = createRecord("simpleRecord", null, null, false); + simpleRecordSchema.setFields(Lists.newArrayList(new Field("simpleField1", create(Type.STRING), null, null), + new Field("simpleField2", create(Type.LONG), null, null), + new Field("simpleList", createArray(create(Type.DOUBLE)), null, null))); + + // complex record - contains a string, a complex field (contains int and long) + complexRecordSchema = createRecord("complexRecord", null, null, false); + complexFieldSchema = createRecord("complexField", null, null, false); + complexFieldSchema.setFields(Lists.newArrayList(new Field("field1", create(Type.INT), null, null), + new Field("field2", create(Type.LONG), null, null))); + complexRecordSchema.setFields(Lists.newArrayList(new Field("simpleField", create(Type.STRING), null, null), + new Field("complexField", complexFieldSchema, null, null))); + + // complex list element - each element contains a record of int and long + complexListSchema = createRecord("complexList", null, null, false); + complexListSchema.setFields(Lists.newArrayList(new Field("field1", create(Type.INT), null, null), + new Field("field2", create(Type.LONG), null, null))); + + Field map1Field = new Field("map1", intStringMapAvroSchema, null, null); + Field map2Field = new Field("map2", stringIntMapAvroSchema, null, null); + Field simpleRecordField = new Field("simpleRecord", simpleRecordSchema, null, null); + Field complexRecordField = new Field("complexRecord", complexRecordSchema, null, null); + Field complexListField = new Field("complexList", createArray(complexListSchema), null, null); + + avroSchema = createRecord("manyComplexTypes", null, null, false); + avroSchema + .setFields(Lists.newArrayList(map1Field, map2Field, simpleRecordField, complexRecordField, complexListField)); + + List<Map<String, Object>> inputRecords = new ArrayList<>(2); + inputRecords.add(getRecord1()); + inputRecords.add(getRecord2()); + return inputRecords; + } + + private Map<String, Object> getRecord1() { + Map<String, Object> record1 = new HashMap<>(); + + Map<Integer, String> map1 = new HashMap<>(); + map1.put(30, "foo"); + map1.put(200, "bar"); + record1.put("map1", map1); + + Map<String, Integer> map2 = new HashMap<>(); + map2.put("k1", 10000); + map2.put("k2", 20000); + record1.put("map2", map2); + + GenericRecord simpleRecord = new GenericData.Record(simpleRecordSchema); + simpleRecord.put("simpleField1", "foo"); + simpleRecord.put("simpleField2", 1588469340000L); + simpleRecord.put("simpleList", Arrays.asList(1.1, 2.2)); + record1.put("simpleRecord", simpleRecord); + + GenericRecord complexRecord = new GenericData.Record(complexRecordSchema); + GenericRecord subComplexRecord = new GenericData.Record(complexFieldSchema); + subComplexRecord.put("field1", 100); + subComplexRecord.put("field2", 1588469340000L); + complexRecord.put("simpleField", "foo"); + complexRecord.put("complexField", subComplexRecord); + record1.put("complexRecord", complexRecord); + + GenericRecord listElem1 = new GenericData.Record(complexListSchema); + listElem1.put("field1", 20); + listElem1.put("field2", 2000200020002000L); + GenericRecord listElem2 = new GenericData.Record(complexListSchema); + listElem2.put("field1", 280); + listElem2.put("field2", 8000200020002000L); + record1.put("complexList", Arrays.asList(listElem1, listElem2)); + + return record1; + } + + private Map<String, Object> getRecord2() { + Map<String, Object> record2 = new HashMap<>(); + Map<Integer, String> map1 = new HashMap<>(); + map1.put(30, "moo"); + map1.put(200, "baz"); + record2.put("map1", map1); + + Map<String, Integer> map2 = new HashMap<>(); + map2.put("k1", 100); + map2.put("k2", 200); + record2.put("map2", map2); + + GenericRecord simpleRecord2 = new GenericData.Record(simpleRecordSchema); + simpleRecord2.put("simpleField1", "foo"); + simpleRecord2.put("simpleField2", 1588469340000L); + simpleRecord2.put("simpleList", Arrays.asList(1.1, 2.2)); + record2.put("simpleRecord", simpleRecord2); + + GenericRecord complexRecord2 = new GenericData.Record(complexRecordSchema); + GenericRecord subComplexRecord2 = new GenericData.Record(complexFieldSchema); + subComplexRecord2.put("field1", 100); + subComplexRecord2.put("field2", 1588469340000L); + complexRecord2.put("simpleField", "foo"); + complexRecord2.put("complexField", subComplexRecord2); + record2.put("complexRecord", complexRecord2); + + GenericRecord listElem12 = new GenericData.Record(complexListSchema); + listElem12.put("field1", 20); + listElem12.put("field2", 2000200020002000L); + GenericRecord listElem22 = new GenericData.Record(complexListSchema); + listElem22.put("field1", 280); + listElem22.put("field2", 8000200020002000L); + record2.put("complexList", Arrays.asList(listElem12, listElem22)); + + return record2; + } + + @Override + protected Set<String> getSourceFields() { + return Sets.newHashSet("map1", "map2", "simpleRecord", "complexRecord", "complexList"); + } + + /** + * Create an AvroRecordReader + */ + @Override + protected RecordReader createRecordReader(Set<String> fieldsToRead) + throws IOException { + AvroRecordReader avroRecordReader = new AvroRecordReader(); + avroRecordReader.init(_dataFile, fieldsToRead, null); + return avroRecordReader; + } + + /** + * Create an Avro input file using the input records containing maps and record + */ + @Override + protected void createInputFile() + throws IOException { + + try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + fileWriter.create(avroSchema, _dataFile); + for (Map<String, Object> inputRecord : _inputRecords) { + GenericData.Record record = new GenericData.Record(avroSchema); + for (String columnName : _sourceFieldNames) { + record.put(columnName, inputRecord.get(columnName)); + } + fileWriter.append(record); + } + } + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java deleted file mode 100644 index 552c011..0000000 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.inputformat.avro; - -import com.google.common.collect.Sets; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest; -import org.apache.pinot.spi.data.readers.RecordReader; - -import static org.apache.avro.Schema.*; - - -/** - * Tests the {@link AvroRecordExtractor} using a schema containing groovy transform functions for Avro maps - */ -public class AvroRecordExtractorMapTypeTest extends AbstractRecordExtractorTest { - - private final File _dataFile = new File(_tempDir, "maps.avro"); - - @Override - protected List<Map<String, Object>> getInputRecords() { - List<Map<String, Object>> inputRecords = new ArrayList<>(2); - Map<String, Object> record1 = new HashMap<>(); - Map<Integer, String> map1 = new HashMap<>(); - map1.put(30, "foo"); - map1.put(200, "bar"); - Map<String, Integer> map2 = new HashMap<>(); - map2.put("k1", 10000); - map2.put("k2", 20000); - record1.put("map1", map1); - record1.put("map2", map2); - inputRecords.add(record1); - - Map<String, Object> record2 = new HashMap<>(); - map1 = new HashMap<>(); - map1.put(30, "moo"); - map1.put(200, "baz"); - map2 = new HashMap<>(); - map2.put("k1", 100); - map2.put("k2", 200); - record2.put("map1", map1); - record2.put("map2", map2); - inputRecords.add(record2); - - return inputRecords; - } - - @Override - protected Set<String> getSourceFields() { - return Sets.newHashSet("map1", "map2"); - } - - /** - * Create an AvroRecordReader - */ - @Override - protected RecordReader createRecordReader(Set<String> fieldsToRead) - throws IOException { - AvroRecordReader avroRecordReader = new AvroRecordReader(); - avroRecordReader.init(_dataFile, fieldsToRead, null); - return avroRecordReader; - } - - /** - * Create an Avro input file using the input records containing maps - */ - @Override - protected void createInputFile() - throws IOException { - org.apache.avro.Schema avroSchema = createRecord("mapRecord", null, null, false); - org.apache.avro.Schema intStringMapAvroSchema = createMap(create(Type.STRING)); - org.apache.avro.Schema stringIntMapAvroSchema = createMap(create(Type.INT)); - List<Field> fields = Arrays.asList(new Field("map1", intStringMapAvroSchema, null, null), - new Field("map2", stringIntMapAvroSchema, null, null)); - avroSchema.setFields(fields); - - try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { - fileWriter.create(avroSchema, _dataFile); - for (Map<String, Object> inputRecord : _inputRecords) { - GenericData.Record record = new GenericData.Record(avroSchema); - for (String columnName : _sourceFieldNames) { - record.put(columnName, inputRecord.get(columnName)); - } - fileWriter.append(record); - } - } - } -} diff --git a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java index a806a8d..678265d 100644 --- a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java @@ -18,8 +18,6 @@ */ package org.apache.pinot.plugin.inputformat.json; -import java.util.Arrays; -import java.util.Collection; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; @@ -47,66 +45,18 @@ public class JSONRecordExtractor implements RecordExtractor<Map<String, Object>> @Override public GenericRow extract(Map<String, Object> from, GenericRow to) { if (_extractAll) { - from.forEach((fieldName, value) -> to.putValue(fieldName, convertValue(value))); + from.forEach((fieldName, value) -> to.putValue(fieldName, JSONRecordExtractorUtils.convertValue(value))); } else { for (String fieldName : _fields) { Object value = from.get(fieldName); // NOTE about JSON behavior - cannot distinguish between INT/LONG and FLOAT/DOUBLE. // DataTypeTransformer fixes it. - Object convertedValue = convertValue(value); + Object convertedValue = JSONRecordExtractorUtils.convertValue(value); to.putValue(fieldName, convertedValue); } } return to; } - private Object convertValue(Object value) { - Object convertedValue; - if (value instanceof Collection) { - convertedValue = convertMultiValue((Collection) value); - } else { - convertedValue = convertSingleValue(value); - } - return convertedValue; - } - - /** - * Converts the value to a single-valued value - */ - @Nullable - private Object convertSingleValue(@Nullable Object value) { - if (value == null) { - return null; - } - if (value instanceof Number) { - return value; - } - return value.toString(); - } - /** - * Converts the value to a multi-valued value - */ - @Nullable - private Object convertMultiValue(@Nullable Collection values) { - if (values == null || values.isEmpty()) { - return null; - } - int numValues = values.size(); - Object[] array = new Object[numValues]; - int index = 0; - for (Object value : values) { - Object convertedValue = convertSingleValue(value); - if (convertedValue != null && !convertedValue.toString().equals("")) { - array[index++] = convertedValue; - } - } - if (index == numValues) { - return array; - } else if (index == 0) { - return null; - } else { - return Arrays.copyOf(array, index); - } - } } diff --git a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtils.java similarity index 51% copy from pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java copy to pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtils.java index a806a8d..47e7927 100644 --- a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtils.java @@ -20,83 +20,56 @@ package org.apache.pinot.plugin.inputformat.json; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.Map; -import java.util.Set; import javax.annotation.Nullable; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.RecordExtractor; -import org.apache.pinot.spi.data.readers.RecordExtractorConfig; /** - * Extractor for JSON records + * Helper methods for converting values from json nodes to */ -public class JSONRecordExtractor implements RecordExtractor<Map<String, Object>> { +public final class JSONRecordExtractorUtils { - private Set<String> _fields; - private boolean _extractAll = false; + private JSONRecordExtractorUtils() {} - @Override - public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) { - _fields = fields; - if (fields == null || fields.isEmpty()) { - _extractAll = true; - } - } - - @Override - public GenericRow extract(Map<String, Object> from, GenericRow to) { - if (_extractAll) { - from.forEach((fieldName, value) -> to.putValue(fieldName, convertValue(value))); - } else { - for (String fieldName : _fields) { - Object value = from.get(fieldName); - // NOTE about JSON behavior - cannot distinguish between INT/LONG and FLOAT/DOUBLE. - // DataTypeTransformer fixes it. - Object convertedValue = convertValue(value); - to.putValue(fieldName, convertedValue); - } + /** + * Converts the value to either a single value (string, number), a multi value (Object[]) or a Map + * + * Natively Pinot only understands single values and multi values. + * Map is useful only if some ingestion transform functions operates on it in the transformation layer + */ + public static Object convertValue(Object value) { + if (value == null) { + return null; } - return to; - } - - private Object convertValue(Object value) { Object convertedValue; if (value instanceof Collection) { convertedValue = convertMultiValue((Collection) value); + } else if (value instanceof Map) { + convertedValue = convertMap((Map) value); } else { - convertedValue = convertSingleValue(value); + if (value instanceof Number) { + convertedValue = value; + } else { + convertedValue = value.toString(); + } } return convertedValue; } /** - * Converts the value to a single-valued value + * Applies conversion to each element of the collection */ @Nullable - private Object convertSingleValue(@Nullable Object value) { - if (value == null) { - return null; - } - if (value instanceof Number) { - return value; - } - return value.toString(); - } - - /** - * Converts the value to a multi-valued value - */ - @Nullable - private Object convertMultiValue(@Nullable Collection values) { - if (values == null || values.isEmpty()) { + private static Object convertMultiValue(Collection values) { + if (values.isEmpty()) { return null; } int numValues = values.size(); Object[] array = new Object[numValues]; int index = 0; for (Object value : values) { - Object convertedValue = convertSingleValue(value); + Object convertedValue = convertValue(value); if (convertedValue != null && !convertedValue.toString().equals("")) { array[index++] = convertedValue; } @@ -109,4 +82,19 @@ public class JSONRecordExtractor implements RecordExtractor<Map<String, Object>> return Arrays.copyOf(array, index); } } + + /** + * Applies the conversion to each value of the map + */ + @Nullable + private static Object convertMap(Map map) { + if (map.isEmpty()) { + return null; + } + Map<Object, Object> convertedMap = new HashMap<>(); + for (Object key : map.keySet()) { + convertedMap.put(key, convertValue(map.get(key))); + } + return convertedMap; + } } diff --git a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtilsTest.java b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtilsTest.java new file mode 100644 index 0000000..8821aed --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorUtilsTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.json; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + + + +public class JSONRecordExtractorUtilsTest { + + @Test(dataProvider = "conversionTestData") + public void testConversion(Object value, Object expectedConvertedValue) + throws JsonProcessingException { + Object convertedValue = JSONRecordExtractorUtils.convertValue(value); + Assert.assertEquals(JsonUtils.objectToString(convertedValue), JsonUtils.objectToString(expectedConvertedValue)); + } + + @DataProvider(name = "conversionTestData") + public Object[][] getConversionTestData() + throws IOException { + List<Object[]> input = new ArrayList<>(); + + String jsonString = "{\n" + + " \"myInt\": 10,\n" + + " \"myLong\": 1588469340000,\n" + + " \"myDouble\": 10.2,\n" + + " \"myNull\": null,\n" + + " \"myString\": \"foo\",\n" + + " \"myIntArray\": [10, 20, 30],\n" + + " \"myStringArray\": [\"foo\", null, \"bar\"],\n" + + " \"myDoubleArray\": [10.2, 12.1, 1.1],\n" + + " \"myComplexArray1\": [{\"one\": 1, \"two\": \"too\"}, {\"one\": 11, \"two\": \"roo\"}],\n" + + " \"myComplexArray2\": [{\"one\":1, \"two\": {\"sub1\":1.1, \"sub2\": 1.2}, \"three\":[\"a\", \"b\"]}, {\"one\":11, \"two\": {\"sub1\":11.1, \"sub2\": 11.2}, \"three\":[\"aa\", \"bb\"]}],\n" + + " \"myMap1\": {\"k1\": \"foo\", \"k2\": \"bar\"},\n" + + " \"myMap2\": {\"k3\": {\"sub1\": 10, \"sub2\": 1.0}, \"k4\": \"baz\", \"k5\": [1,2,3]}\n" + "}"; + Map<String, Object> jsonNode = new ObjectMapper().readValue(jsonString, Map.class); + input.add(new Object[]{jsonNode.get("myNull"), null}); + + input.add(new Object[]{jsonNode.get("myInt"), 10}); + + input.add(new Object[]{jsonNode.get("myLong"), 1588469340000L}); + + input.add(new Object[]{jsonNode.get("myDouble"), 10.2}); + + input.add(new Object[]{jsonNode.get("myString"), "foo"}); + + input.add(new Object[]{jsonNode.get("myIntArray"), new Object[]{10, 20, 30}}); + + input.add(new Object[]{jsonNode.get("myDoubleArray"), new Object[]{10.2, 12.1, 1.1}}); + + input.add(new Object[]{jsonNode.get("myStringArray"), new Object[]{"foo", "bar"}}); + + Map<String, Object> map1 = new HashMap<>(); + map1.put("one", 1); + map1.put("two", "too"); + Map<String, Object> map2 = new HashMap<>(); + map2.put("one", 11); + map2.put("two", "roo"); + input.add(new Object[]{jsonNode.get("myComplexArray1"), new Object[]{map1, map2}}); + + Map<String, Object> map3 = new HashMap<>(); + map3.put("one", 1); + Map<String, Object> map31 = new HashMap<>(); + map31.put("sub1", 1.1); + map31.put("sub2", 1.2); + map3.put("two", map31); + map3.put("three", new Object[]{"a", "b"}); + Map<String, Object> map4 = new HashMap<>(); + map4.put("one", 11); + Map<String, Object> map41 = new HashMap<>(); + map41.put("sub1", 11.1); + map41.put("sub2", 11.2); + map4.put("two", map41); + map4.put("three", new Object[]{"aa", "bb"}); + input.add(new Object[]{jsonNode.get("myComplexArray2"), new Object[]{map3, map4}}); + + Map<String, Object> map5 = new HashMap<>(); + map5.put("k1", "foo"); + map5.put("k2", "bar"); + input.add(new Object[]{jsonNode.get("myMap1"), map5}); + + Map<String, Object> map6 = new HashMap<>(); + Map<String, Object> map61 = new HashMap<>(); + map61.put("sub1", 10); + map61.put("sub2", 1.0); + map6.put("k3", map61); + map6.put("k4", "baz"); + map6.put("k5", new Object[]{1, 2, 3}); + input.add(new Object[]{jsonNode.get("myMap2"), map6}); + + return input.toArray(new Object[0][]); + } + +} \ No newline at end of file diff --git a/pinot-server/src/test/resources/data/test_data-mv.avro b/pinot-server/src/test/resources/data/test_data-mv.avro index 151e091..aff4520 100644 Binary files a/pinot-server/src/test/resources/data/test_data-mv.avro and b/pinot-server/src/test/resources/data/test_data-mv.avro differ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java index 36c919f..f1fc762 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java @@ -23,6 +23,9 @@ import java.util.Set; /** * Extracts fields from input records + * 1) Number/String/ByteBuffer become single-value column + * 2) Collections become Object[] i.e. multi-value column + * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become Map<Object, Object> * @param <T> The format of the input record */ public interface RecordExtractor<T> { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java index e3a1f4a..052209a 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.avro.generic.GenericData; import org.apache.commons.io.FileUtils; import org.apache.pinot.spi.data.Schema; import org.testng.Assert; @@ -114,25 +115,35 @@ public abstract class AbstractRecordExtractorTest { String columnName = entry.getKey(); Object expectedValue = entry.getValue(); Object actualValue = genericRow.getValue(columnName); - if (expectedValue instanceof Collection) { - List actualArray = - actualValue instanceof List ? (ArrayList) actualValue : Arrays.asList((Object[]) actualValue); - List expectedArray = (List) expectedValue; - for (int j = 0; j < actualArray.size(); j++) { - Assert.assertEquals(actualArray.get(j), expectedArray.get(j)); - } - } else if (expectedValue instanceof Map) { - Map<Object, Object> actualMap = (HashMap) actualValue; - Map<Object, Object> expectedMap = (HashMap) expectedValue; - for (Map.Entry<Object, Object> mapEntry : expectedMap.entrySet()) { - Assert.assertEquals(actualMap.get(mapEntry.getKey().toString()), mapEntry.getValue()); - } + checkValue(expectedValue, actualValue); + } + } + + private void checkValue(Object expectedValue, Object actualValue) { + if (expectedValue instanceof Collection) { + List actualArray = + actualValue instanceof List ? (ArrayList) actualValue : Arrays.asList((Object[]) actualValue); + List expectedArray = (List) expectedValue; + for (int j = 0; j < actualArray.size(); j++) { + checkValue(expectedArray.get(j), actualArray.get(j)); + } + } else if (expectedValue instanceof Map) { + Map<Object, Object> actualMap = (HashMap) actualValue; + Map<Object, Object> expectedMap = (HashMap) expectedValue; + for (Map.Entry<Object, Object> mapEntry : expectedMap.entrySet()) { + Assert.assertEquals(actualMap.get(mapEntry.getKey().toString()), mapEntry.getValue()); + } + } else if (expectedValue instanceof GenericData.Record) { + Map<Object, Object> actualMap = (HashMap) actualValue; + GenericData.Record expectedGenericRecord = (GenericData.Record) expectedValue; + for (Map.Entry<Object, Object> mapEntry : actualMap.entrySet()) { + checkValue(expectedGenericRecord.get(mapEntry.getKey().toString()), mapEntry.getValue()); + } + } else { + if (expectedValue != null) { + Assert.assertEquals(actualValue, expectedValue); } else { - if (expectedValue != null) { - Assert.assertEquals(actualValue, expectedValue); - } else { - Assert.assertNull(actualValue); - } + Assert.assertNull(actualValue); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org