This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8c8897e Simplify SegmentGenerationAndPushTask handling getting schema and table config (#6469) 8c8897e is described below commit 8c8897e36a3c9cbc27793576bb139208a508484c Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Fri Jan 22 01:26:54 2021 -0800 Simplify SegmentGenerationAndPushTask handling getting schema and table config (#6469) * Update SegmentGenerationAndPushTaskGenerator to set schemaURI and tableConfigURI by default * Address comments * Address comments --- .../SegmentGenerationAndPushTaskGenerator.java | 11 ++++------- .../SegmentGenerationAndPushTaskExecutor.java | 22 +++++++++++++++------- .../batch/common/SegmentGenerationTaskRunner.java | 7 +++---- .../batch/hadoop/HadoopSegmentCreationMapper.java | 3 +-- .../spark/SparkSegmentGenerationJobRunner.java | 3 +-- .../standalone/SegmentGenerationJobRunner.java | 2 +- .../spi/ingestion/batch/BatchConfigProperties.java | 4 ++-- .../batch/spec/SegmentGenerationTaskSpec.java | 7 ++++--- 8 files changed, 31 insertions(+), 28 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java index 3ea3d31..105be93 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.controller.helix.core.minion.generator; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; @@ -51,7 +50,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.utils.IngestionConfigUtils; -import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,7 +225,7 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator private Map<String, String> getSingleFileGenerationTaskConfig(String offlineTableName, int sequenceID, Map<String, String> batchConfigMap, URI inputFileURI) - throws JsonProcessingException, URISyntaxException { + throws URISyntaxException { URI inputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI)); URI outputDirURI = null; @@ -236,15 +235,13 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap); Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(batchConfigMap); + singleFileGenerationTaskConfig + .put(BatchConfigProperties.TABLE_NAME, TableNameBuilder.OFFLINE.tableNameWithType(offlineTableName)); singleFileGenerationTaskConfig.put(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); if (outputDirURI != null) { URI outputSegmentDirURI = getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI); singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString()); } - singleFileGenerationTaskConfig.put(BatchConfigProperties.SCHEMA, - JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName))); - singleFileGenerationTaskConfig.put(BatchConfigProperties.TABLE_CONFIGS, - JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName))); singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, String.valueOf(sequenceID)); singleFileGenerationTaskConfig .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE); diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java index 535248f..76f22ae 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.minion.executor; -import com.fasterxml.jackson.databind.JsonNode; import java.io.File; import java.io.IOException; import java.net.URI; @@ -33,6 +32,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.LocalPinotFS; @@ -97,7 +97,8 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor { private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L; @Override - public Object executeTask(PinotTaskConfig pinotTaskConfig) throws Exception { + public Object executeTask(PinotTaskConfig pinotTaskConfig) + throws Exception { LOGGER.info("Executing SegmentGenerationAndPushTask with task config: {}", pinotTaskConfig); Map<String, String> taskConfigs = pinotTaskConfig.getConfigs(); SegmentGenerationAndPushResult.Builder resultBuilder = new SegmentGenerationAndPushResult.Builder(); @@ -118,8 +119,7 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor { resultBuilder.setSegmentName(segmentName); // Segment push task - pushSegment(taskSpec.getTableConfig().get(BatchConfigProperties.TABLE_NAME).asText(), taskConfigs, - outputSegmentTarURI); + pushSegment(taskSpec.getTableConfig().getTableName(), taskConfigs, outputSegmentTarURI); resultBuilder.setSucceed(true); } catch (Exception e) { throw new RuntimeException("Failed to execute SegmentGenerationAndPushTask", e); @@ -281,6 +281,8 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor { recordReaderSpec.setClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CLASS)); recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS)); taskSpec.setRecordReaderSpec(recordReaderSpec); + + String tableNameWithType = taskConfigs.get(BatchConfigProperties.TABLE_NAME); Schema schema; if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) { schema = JsonUtils @@ -288,11 +290,17 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor { } else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) { schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI)); } else { - throw new RuntimeException( - "Missing schema for segment generation job: please set `schema` or `schemaURI` in task config."); + schema = getSchema(tableNameWithType); } taskSpec.setSchema(schema); - JsonNode tableConfig = JsonUtils.stringToJsonNode(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS)); + TableConfig tableConfig; + if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS)) { + tableConfig = JsonUtils.stringToObject(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS), TableConfig.class); + } else if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS_URI)) { + tableConfig = SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI)); + } else { + tableConfig = getTableConfig(tableNameWithType); + } taskSpec.setTableConfig(tableConfig); taskSpec.setSequenceId(Integer.parseInt(taskConfigs.get(BatchConfigProperties.SEQUENCE_ID))); SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec(); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java index 7b9a5ef..f4cf01e 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java @@ -70,7 +70,7 @@ public class SegmentGenerationTaskRunner implements Serializable { public String run() throws Exception { - TableConfig tableConfig = JsonUtils.jsonNodeToObject(_taskSpec.getTableConfig(), TableConfig.class); + TableConfig tableConfig = _taskSpec.getTableConfig(); String tableName = tableConfig.getTableName(); Schema schema = _taskSpec.getSchema(); @@ -109,9 +109,8 @@ public class SegmentGenerationTaskRunner implements Serializable { return segmentIndexCreationDriver.getSegmentName(); } - private SegmentNameGenerator getSegmentNameGenerator() - throws IOException { - TableConfig tableConfig = JsonUtils.jsonNodeToObject(_taskSpec.getTableConfig(), TableConfig.class); + private SegmentNameGenerator getSegmentNameGenerator() { + TableConfig tableConfig = _taskSpec.getTableConfig(); String tableName = tableConfig.getTableName(); Schema schema = _taskSpec.getSchema(); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java index 5b5d6f5..eee3780 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java @@ -136,8 +136,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI())); - taskSpec - .setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode()); + taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI())); taskSpec.setSequenceId(idx); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index 5be70f2..eeeceb2 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -300,8 +300,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI())); - taskSpec.setTableConfig( - SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode()); + taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI())); taskSpec.setSequenceId(idx); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java index db1b11a..25e7c5b 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java @@ -186,7 +186,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); taskSpec.setSchema(schema); - taskSpec.setTableConfig(tableConfig.toJsonNode()); + taskSpec.setTableConfig(tableConfig); taskSpec.setSequenceId(i); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java index be0b917..5a28884 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java @@ -22,8 +22,6 @@ package org.apache.pinot.spi.ingestion.batch; * Defines all the keys used in the batch configs map */ public class BatchConfigProperties { - - public static final String TABLE_CONFIGS = "tableConfigs"; public static final String TABLE_NAME = "tableName"; public static final String INPUT_DIR_URI = "inputDirURI"; @@ -38,6 +36,8 @@ public class BatchConfigProperties { public static final String RECORD_READER_CLASS = "recordReader.className"; public static final String RECORD_READER_CONFIG_CLASS = "recordReader.configClassName"; public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop"; + public static final String TABLE_CONFIGS = "tableConfigs"; + public static final String TABLE_CONFIGS_URI = "tableConfigsURI"; public static final String SCHEMA = "schema"; public static final String SCHEMA_URI = "schemaURI"; public static final String SEQUENCE_ID = "sequenceId"; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java index d19d306..2edfba6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -36,7 +37,7 @@ public class SegmentGenerationTaskSpec implements Serializable { /** * Table config to create segment */ - private JsonNode _tableConfig; + private TableConfig _tableConfig; /** * Table schema @@ -73,11 +74,11 @@ public class SegmentGenerationTaskSpec implements Serializable { */ private Map<String, String> _customProperties = new HashMap<>(); - public JsonNode getTableConfig() { + public TableConfig getTableConfig() { return _tableConfig; } - public void setTableConfig(JsonNode tableConfig) { + public void setTableConfig(TableConfig tableConfig) { _tableConfig = tableConfig; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org