This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c8897e  Simplify SegmentGenerationAndPushTask handling getting schema 
and table config (#6469)
8c8897e is described below

commit 8c8897e36a3c9cbc27793576bb139208a508484c
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Fri Jan 22 01:26:54 2021 -0800

    Simplify SegmentGenerationAndPushTask handling getting schema and table 
config (#6469)
    
    * Update SegmentGenerationAndPushTaskGenerator to set schemaURI and 
tableConfigURI by default
    
    * Address comments
    
    * Address comments
---
 .../SegmentGenerationAndPushTaskGenerator.java     | 11 ++++-------
 .../SegmentGenerationAndPushTaskExecutor.java      | 22 +++++++++++++++-------
 .../batch/common/SegmentGenerationTaskRunner.java  |  7 +++----
 .../batch/hadoop/HadoopSegmentCreationMapper.java  |  3 +--
 .../spark/SparkSegmentGenerationJobRunner.java     |  3 +--
 .../standalone/SegmentGenerationJobRunner.java     |  2 +-
 .../spi/ingestion/batch/BatchConfigProperties.java |  4 ++--
 .../batch/spec/SegmentGenerationTaskSpec.java      |  7 ++++---
 8 files changed, 31 insertions(+), 28 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
index 3ea3d31..105be93 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.controller.helix.core.minion.generator;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
@@ -51,7 +50,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -226,7 +225,7 @@ public class SegmentGenerationAndPushTaskGenerator 
implements PinotTaskGenerator
 
   private Map<String, String> getSingleFileGenerationTaskConfig(String 
offlineTableName, int sequenceID,
       Map<String, String> batchConfigMap, URI inputFileURI)
-      throws JsonProcessingException, URISyntaxException {
+      throws URISyntaxException {
 
     URI inputDirURI = 
getDirectoryUri(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
     URI outputDirURI = null;
@@ -236,15 +235,13 @@ public class SegmentGenerationAndPushTaskGenerator 
implements PinotTaskGenerator
     String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap);
 
     Map<String, String> singleFileGenerationTaskConfig = new 
HashMap<>(batchConfigMap);
+    singleFileGenerationTaskConfig
+        .put(BatchConfigProperties.TABLE_NAME, 
TableNameBuilder.OFFLINE.tableNameWithType(offlineTableName));
     
singleFileGenerationTaskConfig.put(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY,
 inputFileURI.toString());
     if (outputDirURI != null) {
       URI outputSegmentDirURI = getRelativeOutputPath(inputDirURI, 
inputFileURI, outputDirURI);
       
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
 outputSegmentDirURI.toString());
     }
-    singleFileGenerationTaskConfig.put(BatchConfigProperties.SCHEMA,
-        
JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName)));
-    singleFileGenerationTaskConfig.put(BatchConfigProperties.TABLE_CONFIGS,
-        
JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName)));
     singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, 
String.valueOf(sequenceID));
     singleFileGenerationTaskConfig
         .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, 
BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
index 535248f..76f22ae 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.minion.executor;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -33,6 +32,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
 import 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.LocalPinotFS;
@@ -97,7 +97,8 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
   private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
 
   @Override
-  public Object executeTask(PinotTaskConfig pinotTaskConfig) throws Exception {
+  public Object executeTask(PinotTaskConfig pinotTaskConfig)
+      throws Exception {
     LOGGER.info("Executing SegmentGenerationAndPushTask with task config: {}", 
pinotTaskConfig);
     Map<String, String> taskConfigs = pinotTaskConfig.getConfigs();
     SegmentGenerationAndPushResult.Builder resultBuilder = new 
SegmentGenerationAndPushResult.Builder();
@@ -118,8 +119,7 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
 
       resultBuilder.setSegmentName(segmentName);
       // Segment push task
-      
pushSegment(taskSpec.getTableConfig().get(BatchConfigProperties.TABLE_NAME).asText(),
 taskConfigs,
-          outputSegmentTarURI);
+      pushSegment(taskSpec.getTableConfig().getTableName(), taskConfigs, 
outputSegmentTarURI);
       resultBuilder.setSucceed(true);
     } catch (Exception e) {
       throw new RuntimeException("Failed to execute 
SegmentGenerationAndPushTask", e);
@@ -281,6 +281,8 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
     
recordReaderSpec.setClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CLASS));
     
recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
     taskSpec.setRecordReaderSpec(recordReaderSpec);
+
+    String tableNameWithType = 
taskConfigs.get(BatchConfigProperties.TABLE_NAME);
     Schema schema;
     if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) {
       schema = JsonUtils
@@ -288,11 +290,17 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
     } else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) {
       schema = 
SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI));
     } else {
-      throw new RuntimeException(
-          "Missing schema for segment generation job: please set `schema` or 
`schemaURI` in task config.");
+      schema = getSchema(tableNameWithType);
     }
     taskSpec.setSchema(schema);
-    JsonNode tableConfig = 
JsonUtils.stringToJsonNode(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS));
+    TableConfig tableConfig;
+    if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS)) {
+      tableConfig = 
JsonUtils.stringToObject(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS), 
TableConfig.class);
+    } else if 
(taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS_URI)) {
+      tableConfig = 
SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI));
+    } else {
+      tableConfig = getTableConfig(tableNameWithType);
+    }
     taskSpec.setTableConfig(tableConfig);
     
taskSpec.setSequenceId(Integer.parseInt(taskConfigs.get(BatchConfigProperties.SEQUENCE_ID)));
     SegmentNameGeneratorSpec segmentNameGeneratorSpec = new 
SegmentNameGeneratorSpec();
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 7b9a5ef..f4cf01e 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -70,7 +70,7 @@ public class SegmentGenerationTaskRunner implements 
Serializable {
 
   public String run()
       throws Exception {
-    TableConfig tableConfig = 
JsonUtils.jsonNodeToObject(_taskSpec.getTableConfig(), TableConfig.class);
+    TableConfig tableConfig = _taskSpec.getTableConfig();
     String tableName = tableConfig.getTableName();
     Schema schema = _taskSpec.getSchema();
 
@@ -109,9 +109,8 @@ public class SegmentGenerationTaskRunner implements 
Serializable {
     return segmentIndexCreationDriver.getSegmentName();
   }
 
-  private SegmentNameGenerator getSegmentNameGenerator()
-      throws IOException {
-    TableConfig tableConfig = 
JsonUtils.jsonNodeToObject(_taskSpec.getTableConfig(), TableConfig.class);
+  private SegmentNameGenerator getSegmentNameGenerator() {
+    TableConfig tableConfig = _taskSpec.getTableConfig();
     String tableName = tableConfig.getTableName();
 
     Schema schema = _taskSpec.getSchema();
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index 5b5d6f5..eee3780 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -136,8 +136,7 @@ public class HadoopSegmentCreationMapper extends 
Mapper<LongWritable, Text, Long
       taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
       taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
       
taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
-      taskSpec
-          
.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
+      
taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()));
       taskSpec.setSequenceId(idx);
       
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
       
taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, 
inputFileURI.toString());
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index 5be70f2..eeeceb2 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -300,8 +300,7 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
           
taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
           taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
           
taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
-          taskSpec.setTableConfig(
-              
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
+          
taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()));
           taskSpec.setSequenceId(idx);
           
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
           
taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, 
inputFileURI.toString());
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index db1b11a..25e7c5b 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -186,7 +186,7 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
         taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
         taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
         taskSpec.setSchema(schema);
-        taskSpec.setTableConfig(tableConfig.toJsonNode());
+        taskSpec.setTableConfig(tableConfig);
         taskSpec.setSequenceId(i);
         
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
         
taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, 
inputFileURI.toString());
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index be0b917..5a28884 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -22,8 +22,6 @@ package org.apache.pinot.spi.ingestion.batch;
  * Defines all the keys used in the batch configs map
  */
 public class BatchConfigProperties {
-
-  public static final String TABLE_CONFIGS = "tableConfigs";
   public static final String TABLE_NAME = "tableName";
 
   public static final String INPUT_DIR_URI = "inputDirURI";
@@ -38,6 +36,8 @@ public class BatchConfigProperties {
   public static final String RECORD_READER_CLASS = "recordReader.className";
   public static final String RECORD_READER_CONFIG_CLASS = 
"recordReader.configClassName";
   public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
+  public static final String TABLE_CONFIGS = "tableConfigs";
+  public static final String TABLE_CONFIGS_URI = "tableConfigsURI";
   public static final String SCHEMA = "schema";
   public static final String SCHEMA_URI = "schemaURI";
   public static final String SEQUENCE_ID = "sequenceId";
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
index d19d306..2edfba6 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -36,7 +37,7 @@ public class SegmentGenerationTaskSpec implements 
Serializable {
   /**
    * Table config to create segment
    */
-  private JsonNode _tableConfig;
+  private TableConfig _tableConfig;
 
   /**
    * Table schema
@@ -73,11 +74,11 @@ public class SegmentGenerationTaskSpec implements 
Serializable {
    */
   private Map<String, String> _customProperties = new HashMap<>();
 
-  public JsonNode getTableConfig() {
+  public TableConfig getTableConfig() {
     return _tableConfig;
   }
 
-  public void setTableConfig(JsonNode tableConfig) {
+  public void setTableConfig(TableConfig tableConfig) {
     _tableConfig = tableConfig;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to