This is an automated email from the ASF dual-hosted git repository. yupeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6f4790f add command to infer pinot schema from json data (#6930) 6f4790f is described below commit 6f4790fcb4c256cb2caa0fc747ae92bd68b9f420 Author: Yupeng Fu <yupe...@users.noreply.github.com> AuthorDate: Wed May 19 13:10:58 2021 -0700 add command to infer pinot schema from json data (#6930) * add command to infer pinot schema from json data * style * style * comments * fix tests * comments * style * style --- .../java/org/apache/pinot/spi/utils/JsonUtils.java | 140 ++++++++++++++++++ .../org/apache/pinot/spi/utils/JsonUtilsTest.java | 55 +++++++ pinot-spi/src/test/resources/json_util_test.json | 50 +++++++ .../pinot/tools/admin/PinotAdministrator.java | 2 + .../tools/admin/command/JsonToPinotSchema.java | 161 +++++++++++++++++++++ 5 files changed, 408 insertions(+) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java index 1069e13..c427640 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.spi.utils; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; @@ -39,9 +41,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.DateTimeGranularitySpec; +import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; public class JsonUtils { @@ -95,6 +104,23 @@ public class JsonUtils { } } + /** + * Reads the first json object from the file that can contain multiple objects + */ + public static JsonNode fileToFirstJsonNode(File jsonFile) + throws IOException { + try (InputStream inputStream = new FileInputStream(jsonFile)) { + JsonFactory jf = new JsonFactory(); + JsonParser jp = jf.createParser(inputStream); + jp.setCodec(DEFAULT_MAPPER); + jp.nextToken(); + if (jp.hasCurrentToken()) { + return DEFAULT_MAPPER.readTree(jp); + } + return null; + } + } + public static <T> T inputStreamToObject(InputStream jsonInputStream, Class<T> valueType) throws IOException { return DEFAULT_READER.forType(valueType).readValue(jsonInputStream); @@ -380,4 +406,118 @@ public class JsonUtils { unnestResults(newCurrentResults, nestedResultsList, index + 1, nonNestedResult, outputResults); } } + + public static Schema getPinotSchemaFromJsonFile(File jsonFile, + @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, + @Nullable List<String> unnestFields, String delimiter) + throws IOException { + JsonNode jsonNode = fileToFirstJsonNode(jsonFile); + if (unnestFields == null) { + unnestFields = new ArrayList<>(); + } + Preconditions.checkState(jsonNode.isObject(), "the JSON data shall be an object"); + return getPinotSchemaFromJsonNode(jsonNode, fieldTypeMap, timeUnit, unnestFields, delimiter); + } + + public static Schema getPinotSchemaFromJsonNode(JsonNode jsonNode, + @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields, + String delimiter) { + Schema pinotSchema = new Schema(); + Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields(); + while (fieldIterator.hasNext()) { + Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next(); + JsonNode childNode = fieldEntry.getValue(); + inferPinotSchemaFromJsonNode(childNode, pinotSchema, fieldEntry.getKey(), fieldTypeMap, timeUnit, unnestFields, + delimiter); + } + return pinotSchema; + } + + private static void inferPinotSchemaFromJsonNode(JsonNode jsonNode, Schema pinotSchema, String path, + @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit, List<String> unnestFields, + String delimiter) { + if (jsonNode.isNull()) { + // do nothing + return; + } else if (jsonNode.isValueNode()) { + DataType dataType = valueOf(jsonNode); + addFieldToPinotSchema(pinotSchema, dataType, path, true, fieldTypeMap, timeUnit); + } else if (jsonNode.isArray()) { + int numChildren = jsonNode.size(); + if (numChildren == 0) { + // do nothing + return; + } + JsonNode childNode = jsonNode.get(0); + + if (unnestFields.contains(path)) { + inferPinotSchemaFromJsonNode(childNode, pinotSchema, path, fieldTypeMap, timeUnit, unnestFields, delimiter); + } else if (childNode.isValueNode()) { + addFieldToPinotSchema(pinotSchema, valueOf(childNode), path, false, fieldTypeMap, timeUnit); + } else { + addFieldToPinotSchema(pinotSchema, DataType.STRING, path, true, fieldTypeMap, timeUnit); + } + } else if (jsonNode.isObject()) { + Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields(); + while (fieldIterator.hasNext()) { + Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next(); + JsonNode childNode = fieldEntry.getValue(); + inferPinotSchemaFromJsonNode(childNode, pinotSchema, String.join(delimiter, path, fieldEntry.getKey()), + fieldTypeMap, timeUnit, unnestFields, delimiter); + } + } else { + throw new IllegalArgumentException(String.format("Unsupported json node type", jsonNode.getClass())); + } + } + + /** + * Returns the data type stored in Pinot that is associated with the given Avro type. + */ + public static DataType valueOf(JsonNode jsonNode) { + if (jsonNode.isInt()) { + return DataType.INT; + } else if (jsonNode.isLong()) { + return DataType.LONG; + } else if (jsonNode.isFloat()) { + return DataType.FLOAT; + } else if (jsonNode.isDouble()) { + return DataType.DOUBLE; + } else if (jsonNode.isBoolean()) { + return DataType.BOOLEAN; + } else if (jsonNode.isBinary()) { + return DataType.BYTES; + } else { + return DataType.STRING; + } + } + + private static void addFieldToPinotSchema(Schema pinotSchema, DataType dataType, String name, + boolean isSingleValueField, @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, + @Nullable TimeUnit timeUnit) { + if (fieldTypeMap == null) { + pinotSchema.addField(new DimensionFieldSpec(name, dataType, isSingleValueField)); + } else { + FieldSpec.FieldType fieldType = + fieldTypeMap.containsKey(name) ? fieldTypeMap.get(name) : FieldSpec.FieldType.DIMENSION; + Preconditions.checkNotNull(fieldType, "Field type not specified for field: %s", name); + switch (fieldType) { + case DIMENSION: + pinotSchema.addField(new DimensionFieldSpec(name, dataType, isSingleValueField)); + break; + case METRIC: + Preconditions.checkState(isSingleValueField, "Metric field: %s cannot be multi-valued", name); + pinotSchema.addField(new MetricFieldSpec(name, dataType)); + break; + case DATE_TIME: + Preconditions.checkState(isSingleValueField, "Time field: %s cannot be multi-valued", name); + Preconditions.checkNotNull(timeUnit, "Time unit cannot be null"); + pinotSchema.addField(new DateTimeFieldSpec(name, dataType, + new DateTimeFormatSpec(1, timeUnit.toString(), DateTimeFieldSpec.TimeFormat.EPOCH.toString()).getFormat(), + new DateTimeGranularitySpec(1, timeUnit).getGranularity())); + break; + default: + throw new UnsupportedOperationException("Unsupported field type: " + fieldType + " for field: " + name); + } + } + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java index 48e9157..4e47264 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java @@ -19,17 +19,26 @@ package org.apache.pinot.spi.utils; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.testng.Assert; import org.testng.annotations.Test; +import org.testng.collections.Lists; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; public class JsonUtilsTest { + private static String JSON_FILE = "json_util_test.json"; @Test public void testFlatten() @@ -255,4 +264,50 @@ public class JsonUtilsTest { assertEquals(thirdFlattenedRecord.get(".addresses..street"), "second st"); } } + + @Test + public void testInferSchema() + throws Exception { + ClassLoader classLoader = JsonUtilsTest.class.getClassLoader(); + final File file = new File(classLoader.getResource(JSON_FILE).getFile()); + Map<String, FieldSpec.FieldType> fieldSpecMap = + new ImmutableMap.Builder<String, FieldSpec.FieldType>().put("d1", FieldSpec.FieldType.DIMENSION) + .put("hoursSinceEpoch", FieldSpec.FieldType.DATE_TIME).put("m1", FieldSpec.FieldType.METRIC).build(); + Schema inferredPinotSchema = + JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, new ArrayList<>(), "."); + Schema expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING) + .addMetric("m1", FieldSpec.DataType.INT) + .addSingleValueDimension("tuple.address.streetaddress", FieldSpec.DataType.STRING) + .addSingleValueDimension("tuple.address.city", FieldSpec.DataType.STRING) + .addSingleValueDimension("entries", FieldSpec.DataType.STRING) + .addMultiValueDimension("d2", FieldSpec.DataType.INT) + .addDateTime("hoursSinceEpoch",FieldSpec.DataType.INT, "1:HOURS:EPOCH","1:HOURS") + .build(); + Assert.assertEquals(inferredPinotSchema, expectedSchema); + + // unnest collection entries + inferredPinotSchema = + JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList("entries"), "."); + expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING) + .addMetric("m1", FieldSpec.DataType.INT) + .addSingleValueDimension("tuple.address.streetaddress", FieldSpec.DataType.STRING) + .addSingleValueDimension("tuple.address.city", FieldSpec.DataType.STRING) + .addSingleValueDimension("entries.id", FieldSpec.DataType.INT) + .addSingleValueDimension("entries.description", FieldSpec.DataType.STRING) + .addMultiValueDimension("d2", FieldSpec.DataType.INT) + .addDateTime("hoursSinceEpoch",FieldSpec.DataType.INT, "1:HOURS:EPOCH","1:HOURS").build(); + Assert.assertEquals(inferredPinotSchema, expectedSchema); + + // change delimiter + inferredPinotSchema = + JsonUtils.getPinotSchemaFromJsonFile(file, fieldSpecMap, TimeUnit.HOURS, Lists.newArrayList(""), "_"); + expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING) + .addMetric("m1", FieldSpec.DataType.INT) + .addSingleValueDimension("tuple_address_streetaddress", FieldSpec.DataType.STRING) + .addSingleValueDimension("tuple_address_city", FieldSpec.DataType.STRING) + .addSingleValueDimension("entries", FieldSpec.DataType.STRING) + .addMultiValueDimension("d2", FieldSpec.DataType.INT) + .addDateTime("hoursSinceEpoch",FieldSpec.DataType.INT, "1:HOURS:EPOCH","1:HOURS").build(); + Assert.assertEquals(inferredPinotSchema, expectedSchema); + } } diff --git a/pinot-spi/src/test/resources/json_util_test.json b/pinot-spi/src/test/resources/json_util_test.json new file mode 100644 index 0000000..281d772 --- /dev/null +++ b/pinot-spi/src/test/resources/json_util_test.json @@ -0,0 +1,50 @@ +{ + "entries": [ + { + "id": 1234, + "description": "entry1" + }, + { + "id": 1235, + "description": "entry2" + } + ], + "tuple": { + "address": { + "streetaddress": "1st Ave", + "city": "Palo Alto" + } + }, + "d2": [ + 1, + 2 + ], + "d1": "dim1", + "hoursSinceEpoch": 1621286582, + "m1": 12 +} +{ + "entries": [ + { + "id": 1236, + "description": "entry1" + }, + { + "id": 1237, + "description": "entry2" + } + ], + "tuple": { + "address": { + "streetaddress": "1st Ave", + "city": "Palo Alto" + } + }, + "d2": [ + 1, + 2 + ], + "d1": "dim2", + "hoursSinceEpoch": 1621286582, + "m1": 12 +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java index be217a4..3b4a768 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java @@ -36,6 +36,7 @@ import org.apache.pinot.tools.admin.command.DeleteClusterCommand; import org.apache.pinot.tools.admin.command.GenerateDataCommand; import org.apache.pinot.tools.admin.command.GitHubEventsQuickStartCommand; import org.apache.pinot.tools.admin.command.ImportDataCommand; +import org.apache.pinot.tools.admin.command.JsonToPinotSchema; import org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand; import org.apache.pinot.tools.admin.command.MoveReplicaGroup; import org.apache.pinot.tools.admin.command.OfflineSegmentIntervalCheckerCommand; @@ -119,6 +120,7 @@ public class PinotAdministrator { @SubCommand(name = "DeleteCluster", impl = DeleteClusterCommand.class), @SubCommand(name = "ShowClusterInfo", impl = ShowClusterInfoCommand.class), @SubCommand(name = "AvroSchemaToPinotSchema", impl = AvroSchemaToPinotSchema.class), + @SubCommand(name = "JsonToPinotSchema", impl = JsonToPinotSchema.class), @SubCommand(name = "RebalanceTable", impl = RebalanceTableCommand.class), @SubCommand(name = "ChangeNumReplicas", impl = ChangeNumReplicasCommand.class), @SubCommand(name = "ValidateConfig", impl = ValidateConfigCommand.class), diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java new file mode 100644 index 0000000..eb29477 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.admin.command; + +import java.io.File; +import java.io.FileWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.tools.Command; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class for command to infer pinot schema from Json data. Given that it is not always possible to + * automatically do this, the intention is to get most of the work done by this class, and require any + * manual editing on top. + */ +public class JsonToPinotSchema extends AbstractBaseAdminCommand implements Command { + private static final Logger LOGGER = LoggerFactory.getLogger(JsonToPinotSchema.class); + + @Option(name = "-jsonFile", required = true, metaVar = "<String>", usage = "Path to json file.") + String _jsonFile; + + @Option(name = "-outputDir", required = true, metaVar = "<string>", usage = "Path to output directory") + String _outputDir; + + @Option(name = "-pinotSchemaName", required = true, metaVar = "<string>", usage = "Pinot schema name") + String _pinotSchemaName; + + @Option(name = "-dimensions", metaVar = "<string>", usage = "Comma separated dimension column names.") + String _dimensions; + + @Option(name = "-metrics", metaVar = "<string>", usage = "Comma separated metric column names.") + String _metrics; + + @Option(name = "-dateTimeColumnName", metaVar = "<string>", usage = "Name of the dateTime column.") + String _dateTimeColumnName; + + @Option(name = "-timeUnit", metaVar = "<string>", usage = "Unit of the time column (default DAYS).") + TimeUnit _timeUnit = TimeUnit.DAYS; + + @Option(name = "-unnestFields", metaVar = "<string>", usage = "Comma separated fields to unnest") + String _unnestFields; + + @Option(name = "-delimiter", metaVar = "<string>", usage = "The delimiter separating components in nested structure, default to dot") + String _delimiter; + + @SuppressWarnings("FieldCanBeLocal") + @Option(name = "-help", help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") + private boolean _help = false; + + @Override + public boolean execute() + throws Exception { + if (_dimensions == null && _metrics == null && _dateTimeColumnName == null) { + LOGGER.error( + "Error: Missing required argument, please specify at least one of -dimensions, -metrics, -timeColumnName"); + return false; + } + + Schema schema; + schema = JsonUtils + .getPinotSchemaFromJsonFile(new File(_jsonFile), buildFieldTypesMap(), _timeUnit, buildUnnestFields(), + getDelimiter()); + schema.setSchemaName(_pinotSchemaName); + + File outputDir = new File(_outputDir); + if (!outputDir.isDirectory()) { + LOGGER.error("ERROR: Output directory: {} does not exist or is not a directory", _outputDir); + return false; + } + File outputFile = new File(outputDir, _pinotSchemaName + ".json"); + LOGGER.info("Store Pinot schema to file: {}", outputFile.getAbsolutePath()); + + try (FileWriter writer = new FileWriter(outputFile)) { + writer.write(schema.toPrettyJsonString()); + } + + return true; + } + + @Override + public String description() { + return "Extracting Pinot schema file from JSON data file."; + } + + @Override + public boolean getHelp() { + return _help; + } + + @Override + public String toString() { + return "JsonToPinotSchema -jsonFile " + _jsonFile + " -outputDir " + _outputDir + " -pinotSchemaName " + + _pinotSchemaName + " -dimensions " + _dimensions + " -metrics " + _metrics + " -timeColumnName " + + _dateTimeColumnName + " -timeUnit " + _timeUnit; + } + + /** + * Build a Map with column name as key and fieldType (dimension/metric/time) as value, from the + * options list. + * + * @return The column <-> fieldType map. + */ + private Map<String, FieldSpec.FieldType> buildFieldTypesMap() { + Map<String, FieldSpec.FieldType> fieldTypes = new HashMap<>(); + if (_dimensions != null) { + for (String column : _dimensions.split("\\s*,\\s*")) { + fieldTypes.put(column, FieldSpec.FieldType.DIMENSION); + } + } + if (_metrics != null) { + for (String column : _metrics.split("\\s*,\\s*")) { + fieldTypes.put(column, FieldSpec.FieldType.METRIC); + } + } + if (_dateTimeColumnName != null) { + fieldTypes.put(_dateTimeColumnName, FieldSpec.FieldType.DATE_TIME); + } + return fieldTypes; + } + + private List<String> buildUnnestFields() { + List<String> unnestFields = new ArrayList<>(); + if (_unnestFields != null) { + for (String field : _unnestFields.split(",")) { + unnestFields.add(field.trim()); + } + } + return unnestFields; + } + + private String getDelimiter() { + return _delimiter == null ? ComplexTypeTransformer.DEFAULT_DELIMITER : _delimiter; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org