Jackie-Jiang commented on a change in pull request #6930:
URL: https://github.com/apache/incubator-pinot/pull/6930#discussion_r633937767



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String, 
String>> currentResults,
       unnestResults(newCurrentResults, nestedResultsList, index + 1, 
nonNestedResult, outputResults);
     }
   }
+
+  public static Schema getPinotSchemaFromJsonFile(File jsonFile,

Review comment:
       I feel this method is not really useful. It requires the whole json file 
to be a single object. In regular cases, the file should contain a list of 
records

##########
File path: 
pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
##########
@@ -93,48 +94,7 @@
 
   //@formatter:off
   @Argument(handler = SubCommandHandler.class, metaVar = "<subCommand>")
-  @SubCommands({
-      @SubCommand(name = "QuickStart", impl = QuickStartCommand.class),
-      @SubCommand(name = "OperateClusterConfig", impl = 
OperateClusterConfigCommand.class),
-      @SubCommand(name = "GenerateData", impl = GenerateDataCommand.class),
-      @SubCommand(name = "LaunchDataIngestionJob", impl = 
LaunchDataIngestionJobCommand.class),
-      @SubCommand(name = "CreateSegment", impl = CreateSegmentCommand.class),
-      @SubCommand(name = "ImportData", impl = ImportDataCommand.class),
-      @SubCommand(name = "StartZookeeper", impl = StartZookeeperCommand.class),
-      @SubCommand(name = "StartKafka", impl = StartKafkaCommand.class),
-      @SubCommand(name = "StreamAvroIntoKafka", impl = 
StreamAvroIntoKafkaCommand.class),
-      @SubCommand(name = "StartController", impl = 
StartControllerCommand.class),
-      @SubCommand(name = "StartBroker", impl = StartBrokerCommand.class),
-      @SubCommand(name = "StartServer", impl = StartServerCommand.class),
-      @SubCommand(name = "StartMinion", impl = StartMinionCommand.class),
-      @SubCommand(name = "StartServiceManager", impl = 
StartServiceManagerCommand.class),
-      @SubCommand(name = "AddTable", impl = AddTableCommand.class),
-      @SubCommand(name = "ChangeTableState", impl = ChangeTableState.class),
-      @SubCommand(name = "AddTenant", impl = AddTenantCommand.class),
-      @SubCommand(name = "AddSchema", impl = AddSchemaCommand.class),
-      @SubCommand(name = "UpdateSchema", impl = AddSchemaCommand.class),
-      @SubCommand(name = "UploadSegment", impl = UploadSegmentCommand.class),
-      @SubCommand(name = "PostQuery", impl = PostQueryCommand.class),
-      @SubCommand(name = "StopProcess", impl = StopProcessCommand.class),
-      @SubCommand(name = "DeleteCluster", impl = DeleteClusterCommand.class),
-      @SubCommand(name = "ShowClusterInfo", impl = 
ShowClusterInfoCommand.class),
-      @SubCommand(name = "AvroSchemaToPinotSchema", impl = 
AvroSchemaToPinotSchema.class),
-      @SubCommand(name = "RebalanceTable", impl = RebalanceTableCommand.class),
-      @SubCommand(name = "ChangeNumReplicas", impl = 
ChangeNumReplicasCommand.class),
-      @SubCommand(name = "ValidateConfig", impl = ValidateConfigCommand.class),
-      @SubCommand(name = "VerifySegmentState", impl = 
VerifySegmentState.class),
-      @SubCommand(name = "ConvertPinotSegment", impl = 
PinotSegmentConvertCommand.class),
-      @SubCommand(name = "MoveReplicaGroup", impl = MoveReplicaGroup.class),
-      @SubCommand(name = "VerifyClusterState", impl = 
VerifyClusterStateCommand.class),
-      @SubCommand(name = "RealtimeProvisioningHelper", impl = 
RealtimeProvisioningHelperCommand.class),
-      @SubCommand(name = "MergeSegments", impl = SegmentMergeCommand.class),
-      @SubCommand(name = "CheckOfflineSegmentIntervals", impl = 
OfflineSegmentIntervalCheckerCommand.class),
-      @SubCommand(name = "AnonymizeData", impl = AnonymizeDataCommand.class),
-      @SubCommand(name = "GitHubEventsQuickStart", impl = 
GitHubEventsQuickStartCommand.class),
-      @SubCommand(name = "StreamGitHubEvents", impl = 
StreamGitHubEventsCommand.class),
-      @SubCommand(name = "BootstrapTable", impl = BootstrapTableCommand.class),
-      @SubCommand(name = "SegmentProcessorFramework", impl = 
SegmentProcessorFrameworkCommand.class)
-  })
+  @SubCommands({@SubCommand(name = "QuickStart", impl = 
QuickStartCommand.class), @SubCommand(name = "OperateClusterConfig", impl = 
OperateClusterConfigCommand.class), @SubCommand(name = "GenerateData", impl = 
GenerateDataCommand.class), @SubCommand(name = "LaunchDataIngestionJob", impl = 
LaunchDataIngestionJobCommand.class), @SubCommand(name = "CreateSegment", impl 
= CreateSegmentCommand.class), @SubCommand(name = "ImportData", impl = 
ImportDataCommand.class), @SubCommand(name = "StartZookeeper", impl = 
StartZookeeperCommand.class), @SubCommand(name = "StartKafka", impl = 
StartKafkaCommand.class), @SubCommand(name = "StreamAvroIntoKafka", impl = 
StreamAvroIntoKafkaCommand.class), @SubCommand(name = "StartController", impl = 
StartControllerCommand.class), @SubCommand(name = "StartBroker", impl = 
StartBrokerCommand.class), @SubCommand(name = "StartServer", impl = 
StartServerCommand.class), @SubCommand(name = "StartMinion", impl = 
StartMinionCommand.class), @SubCommand(name = "StartSe
 rviceManager", impl = StartServiceManagerCommand.class), @SubCommand(name = 
"AddTable", impl = AddTableCommand.class), @SubCommand(name = 
"ChangeTableState", impl = ChangeTableState.class), @SubCommand(name = 
"AddTenant", impl = AddTenantCommand.class), @SubCommand(name = "AddSchema", 
impl = AddSchemaCommand.class), @SubCommand(name = "UpdateSchema", impl = 
AddSchemaCommand.class), @SubCommand(name = "UploadSegment", impl = 
UploadSegmentCommand.class), @SubCommand(name = "PostQuery", impl = 
PostQueryCommand.class), @SubCommand(name = "StopProcess", impl = 
StopProcessCommand.class), @SubCommand(name = "DeleteCluster", impl = 
DeleteClusterCommand.class), @SubCommand(name = "ShowClusterInfo", impl = 
ShowClusterInfoCommand.class), @SubCommand(name = "AvroSchemaToPinotSchema", 
impl = AvroSchemaToPinotSchema.class), @SubCommand(name = "JsonToPinotSchema", 
impl = JsonToPinotSchema.class), @SubCommand(name = "RebalanceTable", impl = 
RebalanceTableCommand.class), @SubCommand(name = "ChangeNu
 mReplicas", impl = ChangeNumReplicasCommand.class), @SubCommand(name = 
"ValidateConfig", impl = ValidateConfigCommand.class), @SubCommand(name = 
"VerifySegmentState", impl = VerifySegmentState.class), @SubCommand(name = 
"ConvertPinotSegment", impl = PinotSegmentConvertCommand.class), 
@SubCommand(name = "MoveReplicaGroup", impl = MoveReplicaGroup.class), 
@SubCommand(name = "VerifyClusterState", impl = 
VerifyClusterStateCommand.class), @SubCommand(name = 
"RealtimeProvisioningHelper", impl = RealtimeProvisioningHelperCommand.class), 
@SubCommand(name = "MergeSegments", impl = SegmentMergeCommand.class), 
@SubCommand(name = "CheckOfflineSegmentIntervals", impl = 
OfflineSegmentIntervalCheckerCommand.class), @SubCommand(name = 
"AnonymizeData", impl = AnonymizeDataCommand.class), @SubCommand(name = 
"GitHubEventsQuickStart", impl = GitHubEventsQuickStartCommand.class), 
@SubCommand(name = "StreamGitHubEvents", impl = 
StreamGitHubEventsCommand.class), @SubCommand(name = "BootstrapTable", impl =
  BootstrapTableCommand.class), @SubCommand(name = "SegmentProcessorFramework", 
impl = SegmentProcessorFrameworkCommand.class)})

Review comment:
       Revert (the formatter should already be turned off by the annotation)

##########
File path: 
pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java
##########
@@ -0,0 +1,143 @@
+package org.apache.pinot.tools.admin.command;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.tools.Command;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class for command to infer pinot schema from Json data. Given that it is 
not always possible to
+ * automatically do this, the intention is to get most of the work done by 
this class, and require any
+ * manual editing on top.
+ */
+public class JsonToPinotSchema extends AbstractBaseAdminCommand implements 
Command {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JsonToPinotSchema.class);
+
+  @Option(name = "-jsonFile", required = true, metaVar = "<String>", usage = 
"Path to json file.")
+  String _jsonFile;
+
+  @Option(name = "-outputDir", required = true, metaVar = "<string>", usage = 
"Path to output directory")
+  String _outputDir;
+
+  @Option(name = "-pinotSchemaName", required = true, metaVar = "<string>", 
usage = "Pinot schema name")
+  String _pinotSchemaName;
+
+  @Option(name = "-dimensions", metaVar = "<string>", usage = "Comma separated 
dimension column names.")
+  String _dimensions;
+
+  @Option(name = "-metrics", metaVar = "<string>", usage = "Comma separated 
metric column names.")
+  String _metrics;
+
+  @Option(name = "-timeColumnName", metaVar = "<string>", usage = "Name of the 
time column.")

Review comment:
       We already deprecated the TIME field, change it to `-dateTimeColumns`?

##########
File path: pinot-spi/src/test/resources/json_util_test.json
##########
@@ -0,0 +1,25 @@
+{
+  "entries": [
+    {
+      "id": 1234,
+      "description": "entry1"
+    },
+    {
+      "id": 1235,
+      "description": "entry2"
+    }
+  ],
+  "tuple": {
+    "address": {
+      "streetaddress": "1st Ave",
+      "city": "Palo Alto"
+    }
+  },
+  "d2": [
+    1,
+    2
+  ],
+  "d1": "dim1",
+  "hoursSinceEpoch": 1621286582,
+  "m1": 12
+}

Review comment:
       New line

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String, 
String>> currentResults,
       unnestResults(newCurrentResults, nestedResultsList, index + 1, 
nonNestedResult, outputResults);
     }
   }
+
+  public static Schema getPinotSchemaFromJsonFile(File jsonFile,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable 
TimeUnit timeUnit, List<String> unnestFields,

Review comment:
       unnestFields should also be nullable

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String, 
String>> currentResults,
       unnestResults(newCurrentResults, nestedResultsList, index + 1, 
nonNestedResult, outputResults);
     }
   }
+
+  public static Schema getPinotSchemaFromJsonFile(File jsonFile,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable 
TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter)
+      throws IOException {
+    JsonNode jsonNode = fileToJsonNode(jsonFile);
+    Preconditions.checkState(jsonNode.isObject(), "the JSON data shall be an 
object");
+    return getPinotSchemaFromJsonNode(jsonNode, fieldTypeMap, timeUnit, 
unnestFields, delimiter);
+  }
+
+  public static Schema getPinotSchemaFromJsonNode(JsonNode jsonNode,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable 
TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter) {
+    Schema pinotSchema = new Schema();
+    Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
+    while (fieldIterator.hasNext()) {
+      Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+      JsonNode childNode = fieldEntry.getValue();
+      inferPinotSchemaFromJsonNode(childNode, pinotSchema, 
fieldEntry.getKey(), fieldTypeMap, timeUnit, unnestFields,
+          delimiter);
+    }
+    return pinotSchema;
+  }
+
+  private static void inferPinotSchemaFromJsonNode(JsonNode jsonNode, Schema 
pinotSchema, String path,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable 
TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter) {
+    if (jsonNode.isNull()) {
+      // do nothing
+      return;

Review comment:
       Should we throw exception when schema cannot be inferred?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String, 
String>> currentResults,
       unnestResults(newCurrentResults, nestedResultsList, index + 1, 
nonNestedResult, outputResults);
     }
   }
+
+  public static Schema getPinotSchemaFromJsonFile(File jsonFile,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable 
TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter)
+      throws IOException {
+    JsonNode jsonNode = fileToJsonNode(jsonFile);
+    Preconditions.checkState(jsonNode.isObject(), "the JSON data shall be an 
object");
+    return getPinotSchemaFromJsonNode(jsonNode, fieldTypeMap, timeUnit, 
unnestFields, delimiter);
+  }
+
+  public static Schema getPinotSchemaFromJsonNode(JsonNode jsonNode,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable 
TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter) {
+    Schema pinotSchema = new Schema();
+    Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
+    while (fieldIterator.hasNext()) {
+      Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+      JsonNode childNode = fieldEntry.getValue();
+      inferPinotSchemaFromJsonNode(childNode, pinotSchema, 
fieldEntry.getKey(), fieldTypeMap, timeUnit, unnestFields,
+          delimiter);
+    }
+    return pinotSchema;
+  }
+
+  private static void inferPinotSchemaFromJsonNode(JsonNode jsonNode, Schema 
pinotSchema, String path,
+      @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable 
TimeUnit timeUnit, List<String> unnestFields,
+      String delimiter) {
+    if (jsonNode.isNull()) {
+      // do nothing
+      return;
+    } else if (jsonNode.isValueNode()) {
+      DataType dataType = valueOf(jsonNode);
+      addFieldToPinotSchema(pinotSchema, dataType, path, true, fieldTypeMap, 
timeUnit);
+    } else if (jsonNode.isArray()) {
+      int numChildren = jsonNode.size();
+      if (numChildren == 0) {
+        // do nothing
+        return;
+      }
+      JsonNode childNode = jsonNode.get(0);
+
+      if (unnestFields.contains(path)) {
+        inferPinotSchemaFromJsonNode(childNode, pinotSchema, path, 
fieldTypeMap, timeUnit, unnestFields, delimiter);
+      } else if (childNode.isValueNode()) {
+        addFieldToPinotSchema(pinotSchema, valueOf(childNode), path, false, 
fieldTypeMap, timeUnit);
+      } else {
+        addFieldToPinotSchema(pinotSchema, DataType.STRING, path, true, 
fieldTypeMap, timeUnit);
+      }
+    } else if (jsonNode.isObject()) {
+      Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
+      while (fieldIterator.hasNext()) {
+        Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+        JsonNode childNode = fieldEntry.getValue();
+        inferPinotSchemaFromJsonNode(childNode, pinotSchema, 
String.join(delimiter, path, fieldEntry.getKey()),
+            fieldTypeMap, timeUnit, unnestFields, delimiter);
+      }
+    } else {
+      throw new IllegalArgumentException(String.format("Unsupported json node 
type", jsonNode.getClass()));
+    }
+  }
+
+  /**
+   * Returns the data type stored in Pinot that is associated with the given 
Avro type.
+   */
+  public static DataType valueOf(JsonNode jsonNode) {
+    if (jsonNode.isInt()) {
+      return DataType.INT;
+    } else if (jsonNode.isLong()) {
+      return DataType.LONG;
+    } else if (jsonNode.isFloat()) {
+      return DataType.FLOAT;
+    } else if (jsonNode.isDouble()) {
+      return DataType.DOUBLE;
+    } else if (jsonNode.isBoolean()) {
+      return DataType.BOOLEAN;
+    } else if (jsonNode.isBinary()) {
+      return DataType.BYTES;
+    } else {
+      return DataType.STRING;
+    }
+  }
+
+  private static void addFieldToPinotSchema(Schema pinotSchema, DataType 
dataType, String name,
+      boolean isSingleValueField, @Nullable Map<String, FieldSpec.FieldType> 
fieldTypeMap,
+      @Nullable TimeUnit timeUnit) {
+    if (fieldTypeMap == null) {
+      pinotSchema.addField(new DimensionFieldSpec(name, dataType, 
isSingleValueField));
+    } else {
+      FieldSpec.FieldType fieldType =
+          fieldTypeMap.containsKey(name) ? fieldTypeMap.get(name) : 
FieldSpec.FieldType.DIMENSION;
+      Preconditions.checkNotNull(fieldType, "Field type not specified for 
field: %s", name);
+      switch (fieldType) {
+        case DIMENSION:
+          pinotSchema.addField(new DimensionFieldSpec(name, dataType, 
isSingleValueField));
+          break;
+        case METRIC:
+          Preconditions.checkState(isSingleValueField, "Metric field: %s 
cannot be multi-valued", name);
+          pinotSchema.addField(new MetricFieldSpec(name, dataType));
+          break;
+        case TIME:

Review comment:
       We already deprecated TIME field. Let's only support `DATE_TIME` here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to