This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch adding_pinot_minion_segment_creation_tasks_2 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 33028d53943fbfcf934c401aca670ddf61b8593c Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Mon Dec 7 22:40:40 2020 -0800 simplify batch config and corresponding utils --- .../pinot/core/util/TableConfigUtilsTest.java | 19 ++-- .../pinot/spi/filesystem/PinotFSFactory.java | 2 +- .../pinot/spi/ingestion/batch/BatchConfig.java | 82 +++++++------- .../spi/ingestion/batch/BatchConfigProperties.java | 49 ++++++--- .../pinot/spi/utils/IngestionConfigUtils.java | 42 +++++++- .../pinot/spi/ingestion/batch/BatchConfigTest.java | 119 ++++++--------------- 6 files changed, 150 insertions(+), 163 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java index ae313c0..9e1828d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java @@ -377,16 +377,12 @@ public class TableConfigUtilsTest { @Test public void ingestionBatchConfigsTest() { Map<String, String> batchConfigMap = new HashMap<>(); - batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, "s3"); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_DIR_URI), "s3://foo"); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.OUTPUT_DIR_URI), "s3://bar"); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.FS_CLASS), "org.foo.S3FS"); - batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_FORMAT), "avro"); - batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.RECORD_READER_CLASS), - "org.foo.Reader"); + batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo"); + batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar"); + batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, "org.foo.S3FS"); + batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_CLASS, "org.foo.GcsFS"); + batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "avro"); + batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, "org.foo.Reader"); IngestionConfig ingestionConfig = new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null, null), @@ -395,8 +391,7 @@ public class TableConfigUtilsTest { new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig) .build(); TableConfigUtils.validateIngestionConfig(tableConfig, null); - - batchConfigMap.remove(BatchConfigProperties.BATCH_TYPE); + batchConfigMap.remove(BatchConfigProperties.INPUT_DIR_URI); try { TableConfigUtils.validateIngestionConfig(tableConfig, null); Assert.fail("Should fail for invalid batch config map"); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java index b163f5c..6366d95 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java @@ -38,8 +38,8 @@ public class PinotFSFactory { private PinotFSFactory() { } + public static final String LOCAL_PINOT_FS_SCHEME = "file"; private static final Logger LOGGER = LoggerFactory.getLogger(PinotFSFactory.class); - private static final String LOCAL_PINOT_FS_SCHEME = "file"; private static final String CLASS = "class"; private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, PinotFS>() { { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java index 965eb9a..eeb4001 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java @@ -28,62 +28,50 @@ import org.apache.pinot.spi.data.readers.FileFormat; * Provides all config related to the batch data source, as configured in the table config's ingestion config */ public class BatchConfig { + private final Map<String, String> _batchConfigMap; + private final String _tableNameWithType; - private final String _type; private final String _inputDirURI; private final String _outputDirURI; - private final String _fsClassName; - private final Map<String, String> _fsProps = new HashMap<>(); + private final String _inputFsClassName; + private final Map<String, String> _inputFsProps = new HashMap<>(); + private final String _outputFsClassName; + private final Map<String, String> _outputFsProps = new HashMap<>(); private final FileFormat _inputFormat; private final String _recordReaderClassName; private final String _recordReaderConfigClassName; private final Map<String, String> _recordReaderProps = new HashMap<>(); - private final Map<String, String> _batchConfigMap = new HashMap<>(); - public BatchConfig(String tableNameWithType, Map<String, String> batchConfigsMap) { - _tableNameWithType = tableNameWithType; - - _type = batchConfigsMap.get(BatchConfigProperties.BATCH_TYPE); - Preconditions.checkState(_type != null, "Property: %s cannot be null", BatchConfigProperties.BATCH_TYPE); + _batchConfigMap = batchConfigsMap; - String inputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_DIR_URI); - _inputDirURI = batchConfigsMap.get(inputDirURIKey); - Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", inputDirURIKey); + _tableNameWithType = tableNameWithType; - String outputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.OUTPUT_DIR_URI); - _outputDirURI = batchConfigsMap.get(outputDirURIKey); - Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", outputDirURIKey); + _inputDirURI = batchConfigsMap.get(BatchConfigProperties.INPUT_DIR_URI); + Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_DIR_URI); - String fsClassNameKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_CLASS); - _fsClassName = batchConfigsMap.get(fsClassNameKey); - Preconditions.checkState(_fsClassName != null, "Property: %s cannot be null", fsClassNameKey); + _outputDirURI = batchConfigsMap.get(BatchConfigProperties.OUTPUT_DIR_URI); + Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", BatchConfigProperties.OUTPUT_DIR_URI); - String inputFormatKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_FORMAT); - String inputFormat = batchConfigsMap.get(inputFormatKey); - Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", inputFormat); + _inputFsClassName = batchConfigsMap.get(BatchConfigProperties.INPUT_FS_CLASS); + _outputFsClassName = batchConfigsMap.get(BatchConfigProperties.OUTPUT_FS_CLASS); + String inputFormat = batchConfigsMap.get(BatchConfigProperties.INPUT_FORMAT); + Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_FORMAT); _inputFormat = FileFormat.valueOf(inputFormat.toUpperCase()); - String recordReaderClassNameKey = - BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CLASS); - _recordReaderClassName = batchConfigsMap.get(recordReaderClassNameKey); - Preconditions.checkState(_recordReaderClassName != null, "Property: %s cannot be null", recordReaderClassNameKey); - - String recordReaderConfigClassNameKey = - BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CONFIG_CLASS); - _recordReaderConfigClassName = batchConfigsMap.get(recordReaderConfigClassNameKey); - - String fsPropPrefix = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_PROP_PREFIX); - String recordReaderPropPrefix = - BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_PROP_PREFIX); + _recordReaderClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CLASS); + _recordReaderConfigClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS); for (Map.Entry<String, String> entry : batchConfigsMap.entrySet()) { String key = entry.getKey(); - if (key.startsWith(fsPropPrefix)) { - _fsProps.put(key, entry.getValue()); - } else if (key.startsWith(recordReaderPropPrefix)) { + if (key.startsWith(BatchConfigProperties.INPUT_FS_PROP_PREFIX)) { + _inputFsProps.put(key, entry.getValue()); + } + if (key.startsWith(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX)) { + _outputFsProps.put(key, entry.getValue()); + } + if (key.startsWith(BatchConfigProperties.RECORD_READER_PROP_PREFIX)) { _recordReaderProps.put(key, entry.getValue()); } - _batchConfigMap.put(key, entry.getValue()); } } @@ -91,10 +79,6 @@ public class BatchConfig { return _tableNameWithType; } - public String getType() { - return _type; - } - public String getInputDirURI() { return _inputDirURI; } @@ -103,12 +87,20 @@ public class BatchConfig { return _outputDirURI; } - public String getFsClassName() { - return _fsClassName; + public String getInputFsClassName() { + return _inputFsClassName; + } + + public Map<String, String> getInputFsProps() { + return _inputFsProps; + } + + public String getOutputFsClassName() { + return _outputFsClassName; } - public Map<String, String> getFsProps() { - return _fsProps; + public Map<String, String> getOutputFsProps() { + return _outputFsProps; } public FileFormat getInputFormat() { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java index 5d276aa..2d83335 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java @@ -18,33 +18,52 @@ */ package org.apache.pinot.spi.ingestion.batch; -import org.apache.commons.lang3.StringUtils; - - /** * Defines all the keys used in the batch configs map */ public class BatchConfigProperties { - public static final String DOT_SEPARATOR = "."; - public static final String BATCH_PREFIX = "batch"; + public static final String TABLE_CONFIGS = "tableConfigs"; + public static final String TABLE_NAME = "tableName"; - public static final String BATCH_TYPE = "batchType"; public static final String INPUT_DIR_URI = "inputDirURI"; public static final String OUTPUT_DIR_URI = "outputDirURI"; - public static final String FS_CLASS = "fs.className"; - public static final String FS_PROP_PREFIX = "fs.prop"; + public static final String INPUT_FS_CLASS = "input.fs.className"; + public static final String INPUT_FS_PROP_PREFIX = "input.fs.prop"; + public static final String OUTPUT_FS_CLASS = "output.fs.className"; + public static final String OUTPUT_FS_PROP_PREFIX = "output.fs.prop"; public static final String INPUT_FORMAT = "inputFormat"; + public static final String INCLUDE_FILE_NAME_PATTERN = "includeFileNamePattern"; + public static final String EXCLUDE_FILE_NAME_PATTERN = "excludeFileNamePattern"; public static final String RECORD_READER_CLASS = "recordReader.className"; - public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className"; + public static final String RECORD_READER_CONFIG_CLASS = "recordReader.configClassName"; public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop"; - + public static final String SCHEMA = "schema"; + public static final String SCHEMA_URI = "schemaURI"; + public static final String SEQUENCE_ID = "sequenceId"; + public static final String SEGMENT_NAME_GENERATOR_TYPE = "segmentNameGenerator.type"; + public static final String SEGMENT_NAME_GENERATOR_CONFIGS = "segmentNameGenerator.configs"; + public static final String OVERWRITE_OUTPUT = "overwriteOutput"; public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri"; - /** - * Helper method to create a batch config property - */ - public static String constructBatchProperty(String batchType, String property) { - return StringUtils.join(BATCH_PREFIX, batchType, property, DOT_SEPARATOR); + public static final String PUSH_MODE = "push.mode"; + public static final String PUSH_CONTROLLER_URI = "push.controllerUri"; + public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix"; + public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix"; + + public static final String INPUT_FILE_URI = "input.file.uri"; + public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri"; + + public enum SegmentIngestionType { + APPEND, REPLACE + } + + public class SegmentNameGeneratorType { + public static final String SIMPLE = "simple"; + public static final String NORMALIZED_DATE = "normalizedDate"; + public static final String FIXED = "fixed"; } + public enum SegmentPushType { + TAR, URI, METADATA + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 62499dd..84e0dbe 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -19,17 +19,23 @@ package org.apache.pinot.spi.utils; import com.google.common.base.Preconditions; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; /** * Helper methods for extracting fields from IngestionConfig in a backward compatible manner */ public final class IngestionConfigUtils { + public static final String DOT_SEPARATOR = "."; + private static final String DEFAULT_PUSH_MODE = "metadata"; + /** * Fetches the streamConfig from the given realtime table. * First, the ingestionConfigs->stream->streamConfigs will be checked. @@ -42,8 +48,10 @@ public final class IngestionConfigUtils { Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); Map<String, String> streamConfigMap = null; - if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { - List<Map<String, String>> streamConfigMaps = tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { + List<Map<String, String>> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream supported per table"); streamConfigMap = streamConfigMaps.get(0); } @@ -92,4 +100,34 @@ public final class IngestionConfigUtils { return segmentIngestionFrequency; } + public static PinotConfiguration getFsProps(Map<String, String> batchConfigMap) { + return new PinotConfiguration(getPropsWithPrefix(batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + DOT_SEPARATOR)); + } + + public static Map<String, Object> getPropsWithPrefix(Map<String, String> batchConfigMap, String prefix) { + Map<String, Object> props = new HashMap<>(); + props.putAll(getConfigMapWithPrefix(batchConfigMap, prefix)); + return props; + } + + public static Map<String, String> getConfigMapWithPrefix(Map<String, String> batchConfigMap, String prefix) { + Map<String, String> props = new HashMap<>(); + for (String configKey : batchConfigMap.keySet()) { + if (configKey.startsWith(prefix)) { + String[] splits = configKey.split(prefix, 2); + if (splits.length > 1) { + props.put(splits[1], batchConfigMap.get(configKey)); + } + } + } + return props; + } + + public static String getPushMode(Map<String, String> batchConfigMap) { + String pushMode = batchConfigMap.get(BatchConfigProperties.PUSH_MODE); + if (pushMode == null) { + pushMode = DEFAULT_PUSH_MODE; + } + return pushMode; + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java index bc34f1a..cd71177 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java @@ -36,76 +36,56 @@ public class BatchConfigTest { public void testBatchConfig() { Map<String, String> batchConfigMap = new HashMap<>(); String tableName = "foo_REALTIME"; - String batchType = "s3"; String inputDir = "s3://foo/input"; String outputDir = "s3://foo/output"; - String fsClass = "org.apache.S3FS"; + String inputFsClass = "org.apache.S3FS"; + String outputFsClass = "org.apache.GcsGS"; String region = "us-west"; String username = "foo"; + String accessKey = "${ACCESS_KEY}"; + String secretKey = "${SECRET_KEY}"; String inputFormat = "csv"; String recordReaderClass = "org.foo.CSVRecordReader"; String recordReaderConfigClass = "org.foo.CSVRecordReaderConfig"; String separator = "|"; - batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, batchType); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI), inputDir); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI), outputDir); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS), fsClass); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), inputFormat); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS), - recordReaderClass); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS), - recordReaderConfigClass); - batchConfigMap - .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region", - region); - batchConfigMap.put( - BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username", - username); - batchConfigMap.put( - BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX) - + ".separator", separator); + batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir); + batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputDir); + batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, inputFsClass); + batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_CLASS, outputFsClass); + batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, inputFormat); + batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, recordReaderClass); + batchConfigMap.put(BatchConfigProperties.RECORD_READER_CONFIG_CLASS, recordReaderConfigClass); + batchConfigMap.put(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".region", region); + batchConfigMap.put(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".username", username); + batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".region", region); + batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".accessKey", accessKey); + batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".secretKey", secretKey); + batchConfigMap.put(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator", separator); // config with all the right properties BatchConfig batchConfig = new BatchConfig(tableName, batchConfigMap); - assertEquals(batchConfig.getType(), batchType); assertEquals(batchConfig.getInputDirURI(), inputDir); assertEquals(batchConfig.getOutputDirURI(), outputDir); - assertEquals(batchConfig.getFsClassName(), fsClass); + assertEquals(batchConfig.getInputFsClassName(), inputFsClass); + assertEquals(batchConfig.getOutputFsClassName(), outputFsClass); assertEquals(batchConfig.getInputFormat(), FileFormat.CSV); assertEquals(batchConfig.getRecordReaderClassName(), recordReaderClass); assertEquals(batchConfig.getRecordReaderConfigClassName(), recordReaderConfigClass); - assertEquals(batchConfig.getFsProps().size(), 2); - assertEquals(batchConfig.getFsProps().get( - BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region"), - region); - assertEquals(batchConfig.getFsProps().get( - BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username"), - username); + assertEquals(batchConfig.getInputFsProps().size(), 2); + assertEquals(batchConfig.getInputFsProps().get(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".region"), region); + assertEquals(batchConfig.getInputFsProps().get(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".username"), username); + assertEquals(batchConfig.getOutputFsProps().size(), 3); + assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".region"), region); + assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".accessKey"), accessKey); + assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".secretKey"), secretKey); assertEquals(batchConfig.getRecordReaderProps().size(), 1); - assertEquals(batchConfig.getRecordReaderProps().get( - BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX) - + ".separator"), separator); + assertEquals(batchConfig.getRecordReaderProps().get(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator"), + separator); assertEquals(batchConfig.getTableNameWithType(), tableName); // Missing props Map<String, String> testBatchConfigMap = new HashMap<>(batchConfigMap); - testBatchConfigMap.remove(BatchConfigProperties.BATCH_TYPE); - try { - new BatchConfig(tableName, testBatchConfigMap); - Assert.fail("Should fail for missing 'batchType"); - } catch (IllegalStateException e) { - // expected - } - - testBatchConfigMap = new HashMap<>(batchConfigMap); - testBatchConfigMap - .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI)); + testBatchConfigMap.remove(BatchConfigProperties.INPUT_DIR_URI); try { new BatchConfig(tableName, testBatchConfigMap); Assert.fail("Should fail for missing 'inputDirURI"); @@ -114,8 +94,7 @@ public class BatchConfigTest { } testBatchConfigMap = new HashMap<>(batchConfigMap); - testBatchConfigMap - .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI)); + testBatchConfigMap.remove(BatchConfigProperties.OUTPUT_DIR_URI); try { new BatchConfig(tableName, testBatchConfigMap); Assert.fail("Should fail for missing 'outputDirURI"); @@ -124,48 +103,12 @@ public class BatchConfigTest { } testBatchConfigMap = new HashMap<>(batchConfigMap); - testBatchConfigMap - .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT)); + testBatchConfigMap.remove(BatchConfigProperties.INPUT_FORMAT); try { new BatchConfig(tableName, testBatchConfigMap); Assert.fail("Should fail for missing 'inputFormat"); } catch (IllegalStateException e) { // expected } - - testBatchConfigMap = new HashMap<>(batchConfigMap); - testBatchConfigMap - .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), "moo"); - try { - new BatchConfig(tableName, testBatchConfigMap); - Assert.fail("Should fail for incorrect 'inputFormat"); - } catch (IllegalArgumentException e) { - // expected - } - - testBatchConfigMap = new HashMap<>(batchConfigMap); - testBatchConfigMap - .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS)); - try { - new BatchConfig(tableName, testBatchConfigMap); - Assert.fail("Should fail for missing 'recordReaderClassName"); - } catch (IllegalStateException e) { - // expected - } - - testBatchConfigMap = new HashMap<>(batchConfigMap); - testBatchConfigMap - .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS)); - new BatchConfig(tableName, testBatchConfigMap); - - testBatchConfigMap = new HashMap<>(batchConfigMap); - testBatchConfigMap - .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS)); - try { - new BatchConfig(tableName, testBatchConfigMap); - Assert.fail("Should fail for missing 'fsClassName"); - } catch (IllegalStateException e) { - // expected - } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org