This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 01b6c22 Support data preprocessing for AVRO and ORC formats (#7062) 01b6c22 is described below commit 01b6c221059c7d90ff618c47b09ea6db338a0b54 Author: Jialiang Li <j...@linkedin.com> AuthorDate: Tue Jun 22 15:07:53 2021 -0700 Support data preprocessing for AVRO and ORC formats (#7062) Co-authored-by: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> --- .../v0_deprecated/pinot-hadoop/pom.xml | 4 + .../hadoop/job/HadoopSegmentPreprocessingJob.java | 382 +++++++-------------- .../pinot/hadoop/job/InternalConfigConstants.java | 13 +- .../job/mappers/AvroDataPreprocessingMapper.java | 85 +++++ .../job/mappers/OrcDataPreprocessingMapper.java | 87 +++++ .../job/mappers/SegmentPreprocessingMapper.java | 53 ++- .../AvroDataPreprocessingPartitioner.java | 77 +++++ .../OrcDataPreprocessingPartitioner.java | 83 +++++ .../preprocess/AvroDataPreprocessingHelper.java | 155 +++++++++ .../job/preprocess/DataPreprocessingHelper.java | 228 ++++++++++++ .../preprocess/DataPreprocessingHelperFactory.java | 55 +++ .../job/preprocess/OrcDataPreprocessingHelper.java | 231 +++++++++++++ ...ucer.java => AvroDataPreprocessingReducer.java} | 45 +-- ...ducer.java => OrcDataPreprocessingReducer.java} | 57 +-- .../hadoop/utils/preprocess/DataFileUtils.java | 62 ++++ .../utils/preprocess/DataPreprocessingUtils.java | 100 ++++++ .../pinot/hadoop/utils/preprocess/HadoopUtils.java | 41 +++ .../pinot/hadoop/utils/preprocess/OrcUtils.java | 88 +++++ .../hadoop/utils/preprocess/TextComparator.java | 41 +++ .../ingestion/jobs/SegmentPreprocessingJob.java | 26 +- .../java/org/apache/pinot/spi/data/FieldSpec.java | 7 + pom.xml | 5 + 22 files changed, 1569 insertions(+), 356 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml index 1c21482..339c832 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml @@ -200,6 +200,10 @@ <classifier>hadoop2</classifier> </dependency> <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + </dependency> + <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> </dependency> diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java index b4e87fc..81a9902 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java @@ -27,66 +27,53 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapred.AvroValue; -import org.apache.avro.mapreduce.AvroJob; -import org.apache.avro.mapreduce.AvroKeyOutputFormat; -import org.apache.avro.mapreduce.AvroMultipleOutputs; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat; -import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper; -import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner; -import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer; +import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper; +import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory; import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper; +import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; +import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; import org.apache.pinot.ingestion.common.ControllerRestApi; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob; -import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; -import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableCustomConfig; -import org.apache.pinot.spi.data.DateTimeFieldSpec; -import org.apache.pinot.spi.data.DateTimeFormatSpec; -import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format. + * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in either Avro or Orc format. * Thus, the output files are partitioned, sorted, resized after this job. * In order to run this job, the following configs need to be specified in job properties: * * enable.preprocessing: false by default. Enables preprocessing job. */ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { - private static final Logger _logger = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class); - protected FileSystem _fileSystem; + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class); + private String _partitionColumn; private int _numPartitions; private String _partitionFunction; - private String _sortedColumn; + + private String _sortingColumn; + private FieldSpec.DataType _sortingColumnType; + private int _numOutputFiles; + private int _maxNumRecordsPerFile; + private TableConfig _tableConfig; private org.apache.pinot.spi.data.Schema _pinotTableSchema; + private Set<DataPreprocessingUtils.Operation> _preprocessingOperations; + public HadoopSegmentPreprocessingJob(final Properties properties) { super(properties); } @@ -94,102 +81,39 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { public void run() throws Exception { if (!_enablePreprocessing) { - _logger.info("Pre-processing job is disabled."); + LOGGER.info("Pre-processing job is disabled."); return; } else { - _logger.info("Starting {}", getClass().getSimpleName()); + LOGGER.info("Starting {}", getClass().getSimpleName()); } - _fileSystem = FileSystem.get(_inputSegmentDir.toUri(), getConf()); - final List<Path> inputDataPaths = getDataFilePaths(_inputSegmentDir); - Preconditions.checkState(inputDataPaths.size() != 0, "No files in the input directory."); - - if (_fileSystem.exists(_preprocessedOutputDir)) { - _logger.warn("Found output folder {}, deleting", _preprocessedOutputDir); - _fileSystem.delete(_preprocessedOutputDir, true); - } setTableConfigAndSchema(); - - _logger.info("Initializing a pre-processing job"); - Job job = Job.getInstance(getConf()); - - Path sampleAvroPath = inputDataPaths.get(0); - int numInputPaths = inputDataPaths.size(); - - setValidationConfigs(job, sampleAvroPath); - setHadoopJobConfigs(job, numInputPaths); - - // Avro Schema configs. - Schema avroSchema = getSchema(sampleAvroPath); - _logger.info("Schema is: {}", avroSchema.toString(true)); - setSchemaParams(job, avroSchema); - - // Set up mapper output key - Set<Schema.Field> fieldSet = new HashSet<>(); - + fetchPreProcessingOperations(); fetchPartitioningConfig(); fetchSortingConfig(); fetchResizingConfig(); - validateConfigsAgainstSchema(avroSchema); - - // Partition configs. - int numReduceTasks = 0; - if (_partitionColumn != null) { - numReduceTasks = _numPartitions; - job.getConfiguration().set(InternalConfigConstants.ENABLE_PARTITIONING, "true"); - job.setPartitionerClass(GenericPartitioner.class); - job.getConfiguration().set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn); - if (_partitionFunction != null) { - job.getConfiguration().set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction); - } - job.getConfiguration().set(InternalConfigConstants.NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks)); - setMaxNumRecordsConfigIfSpecified(job); - } else { - if (_numOutputFiles > 0) { - numReduceTasks = _numOutputFiles; - } else { - // default number of input paths - numReduceTasks = inputDataPaths.size(); - } - // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key. - // so that all the rows can be spread evenly. - addHashCodeField(fieldSet); - } - job.setInputFormatClass(CombineAvroKeyInputFormat.class); - - _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks); - job.setNumReduceTasks(numReduceTasks); - - // Sort config. - if (_sortedColumn != null) { - _logger.info("Adding sorted column: {} to job config", _sortedColumn); - job.getConfiguration().set(InternalConfigConstants.SORTED_COLUMN_CONFIG, _sortedColumn); - addSortedColumnField(avroSchema, fieldSet); - } else { - // If sorting is disabled, hashcode will be the only factor for sort/group comparator. - addHashCodeField(fieldSet); - } + // Cleans up preprocessed output dir if exists + cleanUpPreprocessedOutputs(_preprocessedOutputDir); - // Creates a wrapper for the schema of output key in mapper. - Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false); - mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet)); - _logger.info("Mapper output schema: {}", mapperOutputKeySchema); + DataPreprocessingHelper dataPreprocessingHelper = + DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); + dataPreprocessingHelper + .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction, + _sortingColumn, _sortingColumnType, _numOutputFiles, _maxNumRecordsPerFile); - AvroJob.setInputKeySchema(job, avroSchema); - AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema); - AvroJob.setMapOutputValueSchema(job, avroSchema); - AvroJob.setOutputKeySchema(job, avroSchema); + Job job = dataPreprocessingHelper.setUpJob(); // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is // implemented so that you know what it does. - _logger.info("HDFS class path: " + _pathToDependencyJar); + LOGGER.info("HDFS class path: " + _pathToDependencyJar); if (_pathToDependencyJar != null) { - _logger.info("Copying jars locally."); - PinotHadoopJobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar); + LOGGER.info("Copying jars locally."); + PinotHadoopJobPreparationHelper + .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, job, _pathToDependencyJar); } else { - _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR); + LOGGER.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR); } long startTime = System.currentTimeMillis(); @@ -199,11 +123,28 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { throw new RuntimeException("Job failed : " + job); } - _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime)); + LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime)); + } + + private void fetchPreProcessingOperations() { + _preprocessingOperations = new HashSet<>(); + TableCustomConfig customConfig = _tableConfig.getCustomConfig(); + if (customConfig != null) { + Map<String, String> customConfigMap = customConfig.getCustomConfigs(); + if (customConfigMap != null && !customConfigMap.isEmpty()) { + String preprocessingOperationsString = + customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, ""); + DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString); + } + } } private void fetchPartitioningConfig() { // Fetch partition info from table config. + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) { + LOGGER.info("Partitioning is disabled."); + return; + } SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig(); if (segmentPartitionConfig != null) { Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); @@ -215,123 +156,96 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn); } } else { - _logger.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); + LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); } } private void fetchSortingConfig() { - // Fetch sorting info from table config. + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) { + LOGGER.info("Sorting is disabled."); + return; + } + // Fetch sorting info from table config first. + List<String> sortingColumns = new ArrayList<>(); + List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList(); + if (fieldConfigs != null && !fieldConfigs.isEmpty()) { + for (FieldConfig fieldConfig : fieldConfigs) { + if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) { + sortingColumns.add(fieldConfig.getName()); + } + } + } + if (!sortingColumns.isEmpty()) { + Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table."); + _sortingColumn = sortingColumns.get(0); + return; + } + + // There is no sorted column specified in field configs, try to find sorted column from indexing config. IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); List<String> sortedColumns = indexingConfig.getSortedColumn(); if (sortedColumns != null) { Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table."); if (sortedColumns.size() == 1) { - _sortedColumn = sortedColumns.get(0); + _sortingColumn = sortedColumns.get(0); + FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn); + Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn); + Preconditions + .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn); + _sortingColumnType = fieldSpec.getDataType(); + Preconditions + .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType, + _sortingColumn); + LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType); } } } private void fetchResizingConfig() { + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) { + LOGGER.info("Resizing is disabled."); + return; + } TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); if (tableCustomConfig == null) { _numOutputFiles = 0; return; } Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs(); - if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESS_NUM_FILES)) { - _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESS_NUM_FILES)); + if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) { + _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)); Preconditions.checkState(_numOutputFiles > 0, String - .format("The value of %s should be positive! Current value: %s", InternalConfigConstants.PREPROCESS_NUM_FILES, - _numOutputFiles)); + .format("The value of %s should be positive! Current value: %s", + InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles)); } else { _numOutputFiles = 0; } - } - private void setMaxNumRecordsConfigIfSpecified(Job job) { - TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); - if (tableCustomConfig == null) { - return; - } - Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs(); - if (customConfigsMap != null && customConfigsMap - .containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) { - int maxNumRecords = - Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)); + if (customConfigsMap != null) { + int maxNumRecords; + if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) { + LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.", + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE); + maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)); + } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) { + maxNumRecords = + Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)); + } else { + return; + } + // TODO: add a in-built maximum value for this config to avoid having too many small files. + // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from generating too many small files. Preconditions.checkArgument(maxNumRecords > 0, - "The value of " + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE + "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE + " should be positive. Current value: " + maxNumRecords); - _logger.info("Setting {} to {}", InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, maxNumRecords); - job.getConfiguration() - .set(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, Integer.toString(maxNumRecords)); + LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords); + _maxNumRecordsPerFile = maxNumRecords; } } - /** - * Finds the avro file in the input folder, and returns its avro schema - * @param inputPathDir Path to input directory - * @return Input schema - * @throws IOException exception when accessing to IO - */ - private Schema getSchema(Path inputPathDir) - throws IOException { - FileSystem fs = FileSystem.get(new Configuration()); - Schema avroSchema = null; - for (FileStatus fileStatus : fs.listStatus(inputPathDir)) { - if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) { - _logger.info("Extracting schema from " + fileStatus.getPath()); - try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) { - avroSchema = dataStreamReader.getSchema(); - } - break; - } - } - return avroSchema; - } - - private void addSortedColumnField(Schema schema, Set<Schema.Field> fieldSet) { - // Sorting is enabled. Adding sorted column value to mapper output key. - Schema sortedColumnSchema = schema.getField(_sortedColumn).schema(); - Schema sortedColumnAsKeySchema; - if (sortedColumnSchema.getType().equals(Schema.Type.UNION)) { - sortedColumnAsKeySchema = Schema.createUnion(sortedColumnSchema.getTypes()); - } else if (sortedColumnSchema.getType().equals(Schema.Type.ARRAY)) { - sortedColumnAsKeySchema = Schema.createArray(sortedColumnSchema.getElementType()); - } else { - sortedColumnAsKeySchema = Schema.create(sortedColumnSchema.getType()); - } - Schema.Field columnField = new Schema.Field(_sortedColumn, sortedColumnAsKeySchema, "sortedColumn", null); - fieldSet.add(columnField); - } - - private void validateConfigsAgainstSchema(Schema schema) { - if (_partitionColumn != null) { - Preconditions.checkArgument(schema.getField(_partitionColumn) != null, - String.format("Partition column: %s is not found from the schema of input files.", _partitionColumn)); - Preconditions.checkArgument(_numPartitions > 0, - String.format("Number of partitions should be positive. Current value: %s", _numPartitions)); - Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!"); - try { - PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction); - } catch (IllegalArgumentException e) { - _logger.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}", - _partitionColumn); - throw new IllegalArgumentException(e); - } - } - if (_sortedColumn != null) { - Preconditions.checkArgument(schema.getField(_sortedColumn) != null, - String.format("Sorted column: %s is not found from the schema of input files.", _sortedColumn)); - } - } - - private void addHashCodeField(Set<Schema.Field> fieldSet) { - Schema.Field hashCodeField = new Schema.Field("hashcode", Schema.create(Schema.Type.INT), "hashcode", null); - fieldSet.add(hashCodeField); - } - @Override - protected org.apache.pinot.spi.data.Schema getSchema() + protected Schema getSchema() throws IOException { try (ControllerRestApi controllerRestApi = getControllerRestApi()) { if (controllerRestApi != null) { @@ -360,74 +274,14 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null"); } - private void setValidationConfigs(Job job, Path path) + /** + * Cleans up outputs in preprocessed output directory. + */ + public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir) throws IOException { - SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); - - // TODO: Serialize and deserialize validation config by creating toJson and fromJson - // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one, - // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name - // and value - if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) { - job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true"); - String timeColumnName = validationConfig.getTimeColumnName(); - job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName); - if (timeColumnName != null) { - DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName); - if (dateTimeFieldSpec != null) { - DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); - job.getConfiguration() - .set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); - job.getConfiguration() - .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString()); - job.getConfiguration() - .set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); - } - } - job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY, - IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig)); - try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(path)) { - job.getConfiguration() - .set(InternalConfigConstants.TIME_COLUMN_VALUE, dataStreamReader.next().get(timeColumnName).toString()); - } - } - } - - private void setHadoopJobConfigs(Job job, int numInputPaths) { - job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName()); - // Turn this on to always firstly use class paths that user specifies. - job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true"); - // Turn this off since we don't need an empty file in the output directory - job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false"); - - job.setJarByClass(HadoopSegmentPreprocessingJob.class); - - String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (hadoopTokenFileLocation != null) { - job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation); + if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) { + LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir); + HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true); } - - // Mapper configs. - job.setMapperClass(SegmentPreprocessingMapper.class); - job.setMapOutputKeyClass(AvroKey.class); - job.setMapOutputValueClass(AvroValue.class); - job.getConfiguration().setInt(JobContext.NUM_MAPS, numInputPaths); - - // Reducer configs. - job.setReducerClass(SegmentPreprocessingReducer.class); - job.setOutputKeyClass(AvroKey.class); - job.setOutputValueClass(NullWritable.class); - } - - private void setSchemaParams(Job job, Schema avroSchema) - throws IOException { - AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); - AvroMultipleOutputs.setCountersEnabled(job, true); - // Use LazyOutputFormat to avoid creating empty files. - LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); - - // Input and output paths. - FileInputFormat.setInputPaths(job, _inputSegmentDir); - FileOutputFormat.setOutputPath(job, _preprocessedOutputDir); } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java index 3a7938d..3701db2 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java @@ -31,19 +31,26 @@ public class InternalConfigConstants { public static final String SEGMENT_TIME_FORMAT = "segment.time.format"; public static final String SEGMENT_TIME_SDF_PATTERN = "segment.time.sdf.pattern"; + // The operations of preprocessing that is enabled. + public static final String PREPROCESS_OPERATIONS = "preprocessing.operations"; + // Partitioning configs public static final String PARTITION_COLUMN_CONFIG = "partition.column"; public static final String NUM_PARTITIONS_CONFIG = "num.partitions"; public static final String PARTITION_FUNCTION_CONFIG = "partition.function"; - public static final String SORTED_COLUMN_CONFIG = "sorted.column"; + public static final String SORTING_COLUMN_CONFIG = "sorting.column"; + public static final String SORTING_COLUMN_TYPE = "sorting.type"; public static final String ENABLE_PARTITIONING = "enable.partitioning"; - // max records per file in each partition. No effect otherwise. + @Deprecated + // Use PREPROCESSING_MAX_NUM_RECORDS_PER_FILE. public static final String PARTITION_MAX_RECORDS_PER_FILE = "partition.max.records.per.file"; + // max records per file in each partition. No effect otherwise. + public static final String PREPROCESSING_MAX_NUM_RECORDS_PER_FILE = "preprocessing.max.num.records.per.file"; // Number of segments we want generated. - public static final String PREPROCESS_NUM_FILES = "preprocess.num.files"; + public static final String PREPROCESSING_NUM_REDUCERS = "preprocessing.num.reducers"; public static final String FAIL_ON_SCHEMA_MISMATCH = "fail.on.schema.mismatch"; } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java new file mode 100644 index 0000000..6278e8e --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.mappers; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapred.AvroValue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; +import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; +import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AvroDataPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, WritableComparable, AvroValue<GenericRecord>> { + private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingMapper.class); + + private String _sortingColumn = null; + private FieldSpec.DataType _sortingColumnType = null; + private AvroRecordExtractor _avroRecordExtractor; + + @Override + public void setup(Context context) { + Configuration configuration = context.getConfiguration(); + _avroRecordExtractor = new AvroRecordExtractor(); + String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG); + if (sortingColumnConfig != null) { + _sortingColumn = sortingColumnConfig; + _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE)); + LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn, + _sortingColumnType); + } else { + LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column"); + } + } + + @Override + public void map(AvroKey<GenericRecord> key, NullWritable value, Context context) + throws IOException, InterruptedException { + GenericRecord record = key.datum(); + if (_sortingColumn != null) { + Object object = record.get(_sortingColumn); + Preconditions + .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn, + record); + Object convertedValue = _avroRecordExtractor.convert(object); + Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object, + _sortingColumn, record); + WritableComparable outputKey; + try { + outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType); + } catch (Exception e) { + throw new IllegalStateException( + String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record), + e); + } + context.write(outputKey, new AvroValue<>(record)); + } else { + context.write(NullWritable.get(), new AvroValue<>(record)); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java new file mode 100644 index 0000000..d7d0694 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.mappers; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; +import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct, WritableComparable, OrcValue> { + private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingMapper.class); + + private final OrcValue _valueWrapper = new OrcValue(); + private String _sortingColumn = null; + private FieldSpec.DataType _sortingColumnType = null; + private int _sortingColumnId = -1; + + @Override + public void setup(Context context) { + Configuration configuration = context.getConfiguration(); + String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG); + if (sortingColumnConfig != null) { + _sortingColumn = sortingColumnConfig; + _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE)); + LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn, + _sortingColumnType); + } else { + LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column"); + } + } + + @Override + public void map(NullWritable key, OrcStruct value, Context context) + throws IOException, InterruptedException { + _valueWrapper.value = value; + if (_sortingColumn != null) { + if (_sortingColumnId == -1) { + List<String> fieldNames = value.getSchema().getFieldNames(); + _sortingColumnId = fieldNames.indexOf(_sortingColumn); + Preconditions.checkState(_sortingColumnId != -1, "Failed to find sorting column: %s in the ORC fields: %s", + _sortingColumn, fieldNames); + LOGGER.info("Field id for sorting column: {} is: {}", _sortingColumn, _sortingColumnId); + } + WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId); + WritableComparable outputKey; + try { + outputKey = DataPreprocessingUtils + .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType); + } catch (Exception e) { + throw new IllegalStateException(String + .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn, + _sortingColumnId, value), e); + } + context.write(outputKey, _valueWrapper); + } else { + context.write(NullWritable.get(), _valueWrapper); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java index e3f088d..3d3fcec 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java @@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory; public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class); + private Configuration _jobConf; private String _sortedColumn = null; private String _timeColumn = null; private Schema _outputKeySchema; @@ -52,43 +54,42 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N @Override public void setup(final Context context) { - Configuration configuration = context.getConfiguration(); - - String tableName = configuration.get(JobConfigConstants.SEGMENT_TABLE_NAME); - - _isAppend = configuration.get(InternalConfigConstants.IS_APPEND).equalsIgnoreCase("true"); + _jobConf = context.getConfiguration(); + logConfigurations(); + String tableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME); + _isAppend = "true".equalsIgnoreCase(_jobConf.get(InternalConfigConstants.IS_APPEND)); if (_isAppend) { // Get time column name - _timeColumn = configuration.get(InternalConfigConstants.TIME_COLUMN_CONFIG); + _timeColumn = _jobConf.get(InternalConfigConstants.TIME_COLUMN_CONFIG); // Get sample time column value - String timeColumnValue = configuration.get(InternalConfigConstants.TIME_COLUMN_VALUE); - String pushFrequency = configuration.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY); + String timeColumnValue = _jobConf.get(InternalConfigConstants.TIME_COLUMN_VALUE); + String pushFrequency = _jobConf.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY); - String timeType = configuration.get(InternalConfigConstants.SEGMENT_TIME_TYPE); - String timeFormat = configuration.get(InternalConfigConstants.SEGMENT_TIME_FORMAT); + String timeType = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_TYPE); + String timeFormat = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_FORMAT); DateTimeFormatSpec dateTimeFormatSpec; if (timeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())) { dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat); } else { dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat, - configuration.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN)); + _jobConf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN)); } _normalizedDateSegmentNameGenerator = new NormalizedDateSegmentNameGenerator(tableName, null, false, "APPEND", pushFrequency, dateTimeFormatSpec); _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue); } - String sortedColumn = configuration.get(InternalConfigConstants.SORTED_COLUMN_CONFIG); + String sortedColumn = _jobConf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG); // Logging the configs for the mapper LOGGER.info("Sorted Column: " + sortedColumn); if (sortedColumn != null) { _sortedColumn = sortedColumn; } - _outputKeySchema = AvroJob.getMapOutputKeySchema(configuration); - _outputSchema = AvroJob.getMapOutputValueSchema(configuration); - _enablePartitioning = Boolean.parseBoolean(configuration.get(InternalConfigConstants.ENABLE_PARTITIONING, "false")); + _outputKeySchema = AvroJob.getMapOutputKeySchema(_jobConf); + _outputSchema = AvroJob.getMapOutputValueSchema(_jobConf); + _enablePartitioning = Boolean.parseBoolean(_jobConf.get(InternalConfigConstants.ENABLE_PARTITIONING, "false")); } @Override @@ -130,4 +131,26 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N throw e; } } + + protected void logConfigurations() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append('{'); + boolean firstEntry = true; + for (Map.Entry<String, String> entry : _jobConf) { + if (!firstEntry) { + stringBuilder.append(", \n"); + } else { + firstEntry = false; + } + + stringBuilder.append(entry.getKey()); + stringBuilder.append('='); + stringBuilder.append(entry.getValue()); + } + stringBuilder.append('}'); + + LOGGER.info("*********************************************************************"); + LOGGER.info("Job Configurations: {}", stringBuilder.toString()); + LOGGER.info("*********************************************************************"); + } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java new file mode 100644 index 0000000..74799c7 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.partitioners; + +import com.google.common.base.Preconditions; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroValue; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AvroDataPreprocessingPartitioner extends Partitioner<WritableComparable, AvroValue<GenericRecord>> implements Configurable { + private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingPartitioner.class); + + private Configuration _conf; + private String _partitionColumn; + private PartitionFunction _partitionFunction; + private AvroRecordExtractor _avroRecordExtractor; + + @Override + public void setConf(Configuration conf) { + _conf = conf; + _avroRecordExtractor = new AvroRecordExtractor(); + _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG); + String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG); + int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG)); + _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions); + LOGGER.info( + "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}", + _partitionColumn, partitionFunctionName, numPartitions); + } + + @Override + public Configuration getConf() { + return _conf; + } + + @Override + public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) { + GenericRecord record = value.datum(); + Object object = record.get(_partitionColumn); + Preconditions + .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn, + record); + Object convertedValue = _avroRecordExtractor.convert(object); + Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object, + _partitionColumn, record); + Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String, + "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn, + convertedValue.getClass(), record); + // NOTE: Always partition with String type value because Broker uses String type value to prune segments + return _partitionFunction.getPartition(convertedValue.toString()); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java new file mode 100644 index 0000000..bef2cef --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.partitioners; + +import com.google.common.base.Preconditions; +import java.util.List; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class OrcDataPreprocessingPartitioner extends Partitioner<WritableComparable, OrcValue> implements Configurable { + private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingPartitioner.class); + + private Configuration _conf; + private String _partitionColumn; + private PartitionFunction _partitionFunction; + private int _partitionColumnId = -1; + + @Override + public void setConf(Configuration conf) { + _conf = conf; + _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG); + String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG); + int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG)); + _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions); + LOGGER.info( + "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}", + _partitionColumn, partitionFunctionName, numPartitions); + } + + @Override + public Configuration getConf() { + return _conf; + } + + @Override + public int getPartition(WritableComparable key, OrcValue value, int numPartitions) { + OrcStruct orcStruct = (OrcStruct) value.value; + if (_partitionColumnId == -1) { + List<String> fieldNames = orcStruct.getSchema().getFieldNames(); + _partitionColumnId = fieldNames.indexOf(_partitionColumn); + Preconditions.checkState(_partitionColumnId != -1, "Failed to find partition column: %s in the ORC fields: %s", + _partitionColumn, fieldNames); + LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId); + } + WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId); + Object convertedValue; + try { + convertedValue = OrcUtils.convert(partitionColumnValue); + } catch (Exception e) { + throw new IllegalStateException(String + .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s", _partitionColumn, + _partitionColumnId, orcStruct), e); + } + // NOTE: Always partition with String type value because Broker uses String type value to prune segments + return _partitionFunction.getPartition(convertedValue.toString()); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java new file mode 100644 index 0000000..9e5f5f2 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import java.util.zip.GZIPInputStream; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroJob; +import org.apache.avro.mapreduce.AvroKeyInputFormat; +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.avro.mapreduce.AvroMultipleOutputs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper; +import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner; +import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer; +import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AvroDataPreprocessingHelper extends DataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingHelper.class); + + public AvroDataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) { + super(inputDataPaths, outputPath); + } + + @Override + public Class<? extends Partitioner> getPartitioner() { + return AvroDataPreprocessingPartitioner.class; + } + + @Override + public void setUpMapperReducerConfigs(Job job) + throws IOException { + Schema avroSchema = getAvroSchema(_sampleRawDataPath); + LOGGER.info("Avro schema is: {}", avroSchema.toString(true)); + validateConfigsAgainstSchema(avroSchema); + + job.setInputFormatClass(AvroKeyInputFormat.class); + job.setMapperClass(AvroDataPreprocessingMapper.class); + + job.setReducerClass(AvroDataPreprocessingReducer.class); + AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); + AvroMultipleOutputs.setCountersEnabled(job, true); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); + job.setOutputKeyClass(AvroKey.class); + job.setOutputValueClass(NullWritable.class); + + AvroJob.setInputKeySchema(job, avroSchema); + AvroJob.setMapOutputValueSchema(job, avroSchema); + AvroJob.setOutputKeySchema(job, avroSchema); + } + + @Override + String getSampleTimeColumnValue(String timeColumnName) + throws IOException { + String sampleTimeColumnValue; + try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(_sampleRawDataPath)) { + sampleTimeColumnValue = dataStreamReader.next().get(timeColumnName).toString(); + } + return sampleTimeColumnValue; + } + + /** + * Finds the avro file in the input folder, and returns its avro schema + * @param inputPathDir Path to input directory + * @return Input schema + * @throws IOException exception when accessing to IO + */ + private Schema getAvroSchema(Path inputPathDir) + throws IOException { + Schema avroSchema = null; + for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) { + if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) { + LOGGER.info("Extracting schema from " + fileStatus.getPath()); + try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) { + avroSchema = dataStreamReader.getSchema(); + } + break; + } + } + return avroSchema; + } + + /** + * Helper method that returns avro reader for the given avro file. + * If file name ends in 'gz' then returns the GZIP version, otherwise gives the regular reader. + * + * @param avroFile File to read + * @return Avro reader for the file. + * @throws IOException exception when accessing to IO + */ + private DataFileStream<GenericRecord> getAvroReader(Path avroFile) + throws IOException { + FileSystem fs = FileSystem.get(new Configuration()); + if (avroFile.getName().endsWith("gz")) { + return new DataFileStream<>(new GZIPInputStream(fs.open(avroFile)), new GenericDatumReader<>()); + } else { + return new DataFileStream<>(fs.open(avroFile), new GenericDatumReader<>()); + } + } + + private void validateConfigsAgainstSchema(Schema schema) { + if (_partitionColumn != null) { + Preconditions.checkArgument(schema.getField(_partitionColumn) != null, + String.format("Partition column: %s is not found from the schema of input files.", _partitionColumn)); + Preconditions.checkArgument(_numPartitions > 0, + String.format("Number of partitions should be positive. Current value: %s", _numPartitions)); + Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!"); + try { + PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction); + } catch (IllegalArgumentException e) { + LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}", + _partitionColumn); + throw new IllegalArgumentException(e); + } + } + if (_sortingColumn != null) { + Preconditions.checkArgument(schema.getField(_sortingColumn) != null, + String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn)); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java new file mode 100644 index 0000000..a505d09 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob; +import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner; +import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.hadoop.utils.preprocess.TextComparator; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class DataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class); + + String _partitionColumn; + int _numPartitions; + String _partitionFunction; + + String _sortingColumn; + private FieldSpec.DataType _sortingColumnType; + + private int _numOutputFiles; + private int _maxNumRecordsPerFile; + + private TableConfig _tableConfig; + private Schema _pinotTableSchema; + + List<Path> _inputDataPaths; + Path _sampleRawDataPath; + Path _outputPath; + + public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) { + _inputDataPaths = inputDataPaths; + _sampleRawDataPath = inputDataPaths.get(0); + _outputPath = outputPath; + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles, + int maxNumRecordsPerFile) { + _tableConfig = tableConfig; + _pinotTableSchema = tableSchema; + _partitionColumn = partitionColumn; + _numPartitions = numPartitions; + _partitionFunction = partitionFunction; + + _sortingColumn = sortingColumn; + _sortingColumnType = sortingColumnType; + + _numOutputFiles = numOutputFiles; + _maxNumRecordsPerFile = maxNumRecordsPerFile; + } + + public Job setUpJob() + throws IOException { + LOGGER.info("Initializing a pre-processing job"); + Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION); + Configuration jobConf = job.getConfiguration(); + // Input and output paths. + int numInputPaths = _inputDataPaths.size(); + jobConf.setInt(JobContext.NUM_MAPS, numInputPaths); + setValidationConfigs(job, _sampleRawDataPath); + for (Path inputFile : _inputDataPaths) { + FileInputFormat.addInputPath(job, inputFile); + } + setHadoopJobConfigs(job); + + // Sorting column + if (_sortingColumn != null) { + LOGGER.info("Adding sorting column: {} to job config", _sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name()); + + switch (_sortingColumnType) { + case INT: + job.setMapOutputKeyClass(IntWritable.class); + break; + case LONG: + job.setMapOutputKeyClass(LongWritable.class); + break; + case FLOAT: + job.setMapOutputKeyClass(FloatWritable.class); + break; + case DOUBLE: + job.setMapOutputKeyClass(DoubleWritable.class); + break; + case STRING: + job.setMapOutputKeyClass(Text.class); + job.setSortComparatorClass(TextComparator.class); + break; + default: + throw new IllegalStateException(); + } + } else { + job.setMapOutputKeyClass(NullWritable.class); + } + + // Partition column + int numReduceTasks = 0; + if (_partitionColumn != null) { + numReduceTasks = _numPartitions; + jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true"); + job.setPartitionerClass(GenericPartitioner.class); + jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn); + if (_partitionFunction != null) { + jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction); + } + jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks); + job.setPartitionerClass(getPartitioner()); + } else { + if (_numOutputFiles > 0) { + numReduceTasks = _numOutputFiles; + } else { + // default number of input paths + numReduceTasks = _inputDataPaths.size(); + } + } + // Maximum number of records per output file + jobConf + .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile)); + // Number of reducers + LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks); + job.setNumReduceTasks(numReduceTasks); + + setUpMapperReducerConfigs(job); + + return job; + } + + abstract Class<? extends Partitioner> getPartitioner(); + + abstract void setUpMapperReducerConfigs(Job job) + throws IOException; + + abstract String getSampleTimeColumnValue(String timeColumnName) + throws IOException; + + private void setValidationConfigs(Job job, Path path) + throws IOException { + SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); + + // TODO: Serialize and deserialize validation config by creating toJson and fromJson + // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one, + // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name + // and value + if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) { + job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true"); + String timeColumnName = validationConfig.getTimeColumnName(); + job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName); + if (timeColumnName != null) { + DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName); + if (dateTimeFieldSpec != null) { + DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); + job.getConfiguration() + .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString()); + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); + } + } + job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY, + IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig)); + + String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName); + if (sampleTimeColumnValue != null) { + job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue); + } + } + } + + private void setHadoopJobConfigs(Job job) { + job.setJarByClass(HadoopSegmentPreprocessingJob.class); + job.setJobName(getClass().getName()); + FileOutputFormat.setOutputPath(job, _outputPath); + job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName()); + // Turn this on to always firstly use class paths that user specifies. + job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true"); + // Turn this off since we don't need an empty file in the output directory + job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false"); + + String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); + if (hadoopTokenFileLocation != null) { + job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java new file mode 100644 index 0000000..2e91773 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DataPreprocessingHelperFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelperFactory.class); + + public static DataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath) + throws IOException { + final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION); + final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION); + + int numAvroFiles = avroFiles.size(); + int numOrcFiles = orcFiles.size(); + Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0, + "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles, + inputPaths); + Preconditions + .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s", + inputPaths); + + if (numAvroFiles > 0) { + LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths); + return new AvroDataPreprocessingHelper(avroFiles, outputPath); + } else { + LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths); + return new OrcDataPreprocessingHelper(orcFiles, outputPath); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java new file mode 100644 index 0000000..aec0bb0 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.apache.orc.mapreduce.OrcInputFormat; +import org.apache.orc.mapreduce.OrcOutputFormat; +import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper; +import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner; +import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer; +import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; +import org.apache.pinot.spi.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class OrcDataPreprocessingHelper extends DataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingHelper.class); + + public OrcDataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) { + super(inputDataPaths, outputPath); + } + + @Override + Class<? extends Partitioner> getPartitioner() { + return OrcDataPreprocessingPartitioner.class; + } + + @Override + void setUpMapperReducerConfigs(Job job) { + TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath); + String orcSchemaString = orcSchema.toString(); + LOGGER.info("Orc schema is: {}", orcSchemaString); + validateConfigsAgainstSchema(orcSchema); + + job.setInputFormatClass(OrcInputFormat.class); + job.setMapperClass(OrcDataPreprocessingMapper.class); + job.setMapOutputValueClass(OrcValue.class); + Configuration jobConf = job.getConfiguration(); + OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString); + + job.setReducerClass(OrcDataPreprocessingReducer.class); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(OrcStruct.class); + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString); + } + + @Override + String getSampleTimeColumnValue(String timeColumnName) + throws IOException { + try (Reader reader = OrcFile + .createReader(_sampleRawDataPath, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) { + Reader.Options options = new Reader.Options(); + options.range(0, 1); + RecordReader records = reader.rows(options); + TypeDescription orcSchema = reader.getSchema(); + VectorizedRowBatch vectorizedRowBatch = orcSchema.createRowBatch(); + + if (records.nextBatch(vectorizedRowBatch)) { + List<String> orcFields = orcSchema.getFieldNames(); + int numFields = orcFields.size(); + for (int i = 0; i < numFields; i++) { + String fieldName = orcFields.get(i); + if (timeColumnName.equals(fieldName)) { + ColumnVector columnVector = vectorizedRowBatch.cols[i]; + TypeDescription fieldType = orcSchema.getChildren().get(i); + TypeDescription.Category category = fieldType.getCategory(); + return getValue(fieldName, columnVector, category); + } + } + } + } + return null; + } + + private String getValue(String field, ColumnVector columnVector, TypeDescription.Category category) { + switch (category) { + case BOOLEAN: + LongColumnVector longColumnVector = (LongColumnVector) columnVector; + if (longColumnVector.noNulls || !longColumnVector.isNull[0]) { + return Boolean.toString(longColumnVector.vector[0] == 1); + } else { + return null; + } + case BYTE: + case SHORT: + case INT: + // Extract to Integer + longColumnVector = (LongColumnVector) columnVector; + if (longColumnVector.noNulls || !longColumnVector.isNull[0]) { + return Integer.toString((int) longColumnVector.vector[0]); + } else { + return null; + } + case LONG: + case DATE: + // Extract to Long + longColumnVector = (LongColumnVector) columnVector; + if (longColumnVector.noNulls || !longColumnVector.isNull[0]) { + return Long.toString(longColumnVector.vector[0]); + } else { + return null; + } + case TIMESTAMP: + // Extract to Long + TimestampColumnVector timestampColumnVector = (TimestampColumnVector) columnVector; + if (timestampColumnVector.noNulls || !timestampColumnVector.isNull[0]) { + return Long.toString(timestampColumnVector.time[0]); + } else { + return null; + } + case FLOAT: + // Extract to Float + DoubleColumnVector doubleColumnVector = (DoubleColumnVector) columnVector; + if (doubleColumnVector.noNulls || !doubleColumnVector.isNull[0]) { + return Float.toString((float) doubleColumnVector.vector[0]); + } else { + return null; + } + case DOUBLE: + // Extract to Double + doubleColumnVector = (DoubleColumnVector) columnVector; + if (doubleColumnVector.noNulls || !doubleColumnVector.isNull[0]) { + return Double.toString(doubleColumnVector.vector[0]); + } else { + return null; + } + case STRING: + case VARCHAR: + case CHAR: + // Extract to String + BytesColumnVector bytesColumnVector = (BytesColumnVector) columnVector; + if (bytesColumnVector.noNulls || !bytesColumnVector.isNull[0]) { + int length = bytesColumnVector.length[0]; + return StringUtils.decodeUtf8(bytesColumnVector.vector[0], bytesColumnVector.start[0], length); + } else { + return null; + } + case BINARY: + // Extract to byte[] + bytesColumnVector = (BytesColumnVector) columnVector; + if (bytesColumnVector.noNulls || !bytesColumnVector.isNull[0]) { + int length = bytesColumnVector.length[0]; + byte[] bytes = new byte[length]; + System.arraycopy(bytesColumnVector.vector[0], bytesColumnVector.start[0], bytes, 0, length); + return new String(bytes, StandardCharsets.UTF_8); + } else { + return null; + } + default: + // Unsupported types + throw new IllegalStateException("Unsupported field type: " + category + " for field: " + field); + } + } + + /** + * Finds the orc file and return its orc schema. + */ + private TypeDescription getOrcSchema(Path orcFile) { + TypeDescription orcSchema; + try (Reader reader = OrcFile.createReader(orcFile, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) { + orcSchema = reader.getSchema(); + } catch (Exception e) { + throw new IllegalStateException("Caught exception while extracting ORC schema from file: " + orcFile, e); + } + return orcSchema; + } + + private void validateConfigsAgainstSchema(TypeDescription schema) { + List<String> fieldNames = schema.getFieldNames(); + if (_partitionColumn != null) { + Preconditions.checkArgument(fieldNames.contains(_partitionColumn), + String.format("Partition column: %s is not found from the schema of input files.", _partitionColumn)); + Preconditions.checkArgument(_numPartitions > 0, + String.format("Number of partitions should be positive. Current value: %s", _numPartitions)); + Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!"); + try { + PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction); + } catch (IllegalArgumentException e) { + LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}", + _partitionColumn); + throw new IllegalArgumentException(e); + } + } + if (_sortingColumn != null) { + Preconditions.checkArgument(fieldNames.contains(_sortingColumn), + String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn)); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java similarity index 63% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java index 9c07e6f..5fcbf10 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java @@ -19,7 +19,6 @@ package org.apache.pinot.hadoop.job.reducers; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; @@ -33,33 +32,43 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class); +public class AvroDataPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> { + private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingReducer.class); private AvroMultipleOutputs _multipleOutputs; - private AtomicInteger _counter; - private int _maxNumberOfRecords; + private long _numRecords; + private int _maxNumRecordsPerFile; private String _filePrefix; @Override public void setup(Context context) { - LOGGER.info("Using multiple outputs strategy."); Configuration configuration = context.getConfiguration(); - _multipleOutputs = new AvroMultipleOutputs(context); - _counter = new AtomicInteger(); // If it's 0, the output file won't be split into multiple files. // If not, output file will be split when the number of records reaches this number. - _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0); - LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords); - _filePrefix = RandomStringUtils.randomAlphanumeric(4); + _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0); + if (_maxNumRecordsPerFile > 0) { + LOGGER.info("Using multiple outputs strategy."); + _multipleOutputs = new AvroMultipleOutputs(context); + _numRecords = 0L; + _filePrefix = RandomStringUtils.randomAlphanumeric(4); + LOGGER.info("Initialized AvroDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile); + } else { + LOGGER.info("Initialized AvroDataPreprocessingReducer without limit on maxNumRecordsPerFile"); + } } @Override public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context) throws IOException, InterruptedException { - for (final AvroValue<GenericRecord> value : values) { - String fileName = generateFileName(); - _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName); + if (_maxNumRecordsPerFile > 0) { + for (final AvroValue<GenericRecord> value : values) { + String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile); + _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName); + } + } else { + for (final AvroValue<GenericRecord> value : values) { + context.write(new AvroKey<>(value.datum()), NullWritable.get()); + } } } @@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic } LOGGER.info("Finished cleaning up reducer."); } - - private String generateFileName() { - if (_maxNumberOfRecords == 0) { - return _filePrefix; - } else { - return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords); - } - } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java similarity index 54% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java index 9c07e6f..a3387a2 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java @@ -19,47 +19,56 @@ package org.apache.pinot.hadoop.job.reducers; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapred.AvroValue; -import org.apache.avro.mapreduce.AvroMultipleOutputs; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; import org.apache.pinot.hadoop.job.InternalConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class); +public class OrcDataPreprocessingReducer extends Reducer<WritableComparable, OrcValue, NullWritable, OrcStruct> { + private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingReducer.class); - private AvroMultipleOutputs _multipleOutputs; - private AtomicInteger _counter; - private int _maxNumberOfRecords; + private int _maxNumRecordsPerFile; + private MultipleOutputs<NullWritable, OrcStruct> _multipleOutputs; + private long _numRecords; private String _filePrefix; @Override public void setup(Context context) { - LOGGER.info("Using multiple outputs strategy."); Configuration configuration = context.getConfiguration(); - _multipleOutputs = new AvroMultipleOutputs(context); - _counter = new AtomicInteger(); // If it's 0, the output file won't be split into multiple files. // If not, output file will be split when the number of records reaches this number. - _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0); - LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords); - _filePrefix = RandomStringUtils.randomAlphanumeric(4); + _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0); + if (_maxNumRecordsPerFile > 0) { + LOGGER.info("Using multiple outputs strategy."); + _multipleOutputs = new MultipleOutputs<>(context); + _numRecords = 0L; + _filePrefix = RandomStringUtils.randomAlphanumeric(4); + LOGGER.info("Initialized OrcDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile); + } else { + LOGGER.info("Initialized OrcDataPreprocessingReducer without limit on maxNumRecordsPerFile"); + } } @Override - public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context) + public void reduce(WritableComparable key, Iterable<OrcValue> values, Context context) throws IOException, InterruptedException { - for (final AvroValue<GenericRecord> value : values) { - String fileName = generateFileName(); - _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName); + if (_maxNumRecordsPerFile > 0) { + for (final OrcValue value : values) { + String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile); + _multipleOutputs.write(NullWritable.get(), (OrcStruct) value.value, fileName); + } + } else { + for (final OrcValue value : values) { + context.write(NullWritable.get(), (OrcStruct) value.value); + } } } @@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic } LOGGER.info("Finished cleaning up reducer."); } - - private String generateFileName() { - if (_maxNumberOfRecords == 0) { - return _filePrefix; - } else { - return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords); - } - } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java new file mode 100644 index 0000000..58e1c1d --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + + +public class DataFileUtils { + private DataFileUtils() { + } + + public static final String AVRO_FILE_EXTENSION = ".avro"; + public static final String ORC_FILE_EXTENSION = ".orc"; + + /** + * Returns the data files under the input directory with the given file extension. + */ + public static List<Path> getDataFiles(Path inputDir, String dataFileExtension) + throws IOException { + FileStatus fileStatus = HadoopUtils.DEFAULT_FILE_SYSTEM.getFileStatus(inputDir); + Preconditions.checkState(fileStatus.isDirectory(), "Path: %s is not a directory", inputDir); + List<Path> dataFiles = new ArrayList<>(); + getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputDir), dataFileExtension, dataFiles); + return dataFiles; + } + + private static void getDataFilesHelper(FileStatus[] fileStatuses, String dataFileExtension, List<Path> dataFiles) + throws IOException { + for (FileStatus fileStatus : fileStatuses) { + Path path = fileStatus.getPath(); + if (fileStatus.isDirectory()) { + getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(path), dataFileExtension, dataFiles); + } else { + Preconditions.checkState(fileStatus.isFile(), "Path: %s is neither a directory nor a file", path); + if (path.getName().endsWith(dataFileExtension)) { + dataFiles.add(path); + } + } + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java new file mode 100644 index 0000000..bf399af --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import java.util.Set; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.pinot.spi.data.FieldSpec; + + +public class DataPreprocessingUtils { + private DataPreprocessingUtils() { + } + + /** + * Converts a value into {@link WritableComparable} based on the given data type. + * <p>NOTE: The passed in value must be either a Number or a String. + */ + public static WritableComparable convertToWritableComparable(Object value, FieldSpec.DataType dataType) { + if (value instanceof Number) { + Number numberValue = (Number) value; + switch (dataType) { + case INT: + return new IntWritable(numberValue.intValue()); + case LONG: + return new LongWritable(numberValue.longValue()); + case FLOAT: + return new FloatWritable(numberValue.floatValue()); + case DOUBLE: + return new DoubleWritable(numberValue.doubleValue()); + case STRING: + return new Text(numberValue.toString()); + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } else if (value instanceof String) { + String stringValue = (String) value; + switch (dataType) { + case INT: + return new IntWritable(Integer.parseInt(stringValue)); + case LONG: + return new LongWritable(Long.parseLong(stringValue)); + case FLOAT: + return new FloatWritable(Float.parseFloat(stringValue)); + case DOUBLE: + return new DoubleWritable(Double.parseDouble(stringValue)); + case STRING: + return new Text(stringValue); + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } else { + throw new IllegalArgumentException( + String.format("Value: %s must be either a Number or a String, found: %s", value, value.getClass())); + } + } + + public static Set<Operation> getOperations(Set<Operation> operationSet, String preprocessingOperationsString) { + String[] preprocessingOpsArray = preprocessingOperationsString.split(","); + for (String preprocessingOps : preprocessingOpsArray) { + operationSet.add(Operation.getOperation(preprocessingOps.trim().toUpperCase())); + } + return operationSet; + } + + public enum Operation { + PARTITION, + SORT, + RESIZE; + + public static Operation getOperation(String operationString) { + for (Operation operation : Operation.values()) { + if (operation.name().equals(operationString)) { + return operation; + } + } + throw new IllegalArgumentException("Unsupported data preprocessing operation: " + operationString); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java new file mode 100644 index 0000000..0596259 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + + +public class HadoopUtils { + private HadoopUtils() { + } + + public static final Configuration DEFAULT_CONFIGURATION; + public static final FileSystem DEFAULT_FILE_SYSTEM; + + static { + DEFAULT_CONFIGURATION = new Configuration(); + try { + DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION); + } catch (IOException e) { + throw new IllegalStateException("Failed to get the default file system", e); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java new file mode 100644 index 0000000..dcfc3b5 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.orc.mapred.OrcTimestamp; + + +public class OrcUtils { + private OrcUtils() { + } + + /** + * Converts the ORC value into Number or String. + * <p>The following ORC types are supported: + * <ul> + * <li>IntWritable -> Integer</li> + * <li>LongWritable -> Long</li> + * <li>FloatWritable -> Float</li> + * <li>DoubleWritable -> Double</li> + * <li>Text -> String</li> + * <li>BooleanWritable -> String</li> + * <li>ByteWritable -> Byte</li> + * <li>ShortWritable -> Short</li> + * <li>DateWritable -> Long</li> + * <li>OrcTimestamp -> Long</li> + * </ul> + */ + public static Object convert(WritableComparable orcValue) { + if (orcValue instanceof IntWritable) { + return ((IntWritable) orcValue).get(); + } + if (orcValue instanceof LongWritable) { + return ((LongWritable) orcValue).get(); + } + if (orcValue instanceof FloatWritable) { + return ((FloatWritable) orcValue).get(); + } + if (orcValue instanceof DoubleWritable) { + return ((DoubleWritable) orcValue).get(); + } + if (orcValue instanceof Text) { + return orcValue.toString(); + } + if (orcValue instanceof BooleanWritable) { + return Boolean.toString(((BooleanWritable) orcValue).get()); + } + if (orcValue instanceof ByteWritable) { + return ((ByteWritable) orcValue).get(); + } + if (orcValue instanceof ShortWritable) { + return ((ShortWritable) orcValue).get(); + } + if (orcValue instanceof DateWritable) { + return ((DateWritable) orcValue).get().getTime(); + } + if (orcValue instanceof OrcTimestamp) { + return ((OrcTimestamp) orcValue).getTime(); + } + throw new IllegalArgumentException( + String.format("Illegal ORC value: %s, class: %s", orcValue, orcValue.getClass())); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java new file mode 100644 index 0000000..65f1222 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.pinot.spi.utils.StringUtils; + + +/** + * Override the Text comparison logic to compare with the decoded String instead of the byte array. + */ +public class TextComparator extends WritableComparator { + public TextComparator() { + super(Text.class); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int n1 = WritableUtils.decodeVIntSize(b1[s1]); + int n2 = WritableUtils.decodeVIntSize(b2[s2]); + return StringUtils.decodeUtf8(b1, s1 + n1, l1 - n1).compareTo(StringUtils.decodeUtf8(b2, s2 + n2, l2 - n2)); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java index 2e9d98c..2e3b023 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java @@ -22,15 +22,11 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; import java.util.Properties; -import java.util.zip.GZIPInputStream; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pinot.ingestion.common.ControllerRestApi; import org.apache.pinot.ingestion.common.JobConfigConstants; +import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,26 +71,8 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { protected abstract void run() throws Exception; - /** - * Helper method that returns avro reader for the given avro file. - * If file name ends in 'gz' then returns the GZIP version, otherwise gives the regular reader. - * - * @param avroFile File to read - * @return Avro reader for the file. - * @throws IOException exception when accessing to IO - */ - protected DataFileStream<GenericRecord> getAvroReader(Path avroFile) - throws IOException { - FileSystem fs = FileSystem.get(new Configuration()); - if (avroFile.getName().endsWith("gz")) { - return new DataFileStream<>(new GZIPInputStream(fs.open(avroFile)), new GenericDatumReader<>()); - } else { - return new DataFileStream<>(fs.open(avroFile), new GenericDatumReader<>()); - } - } - @Override - protected org.apache.pinot.spi.data.Schema getSchema() + protected Schema getSchema() throws IOException { try (ControllerRestApi controllerRestApi = getControllerRestApi()) { if (controllerRestApi != null) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java index f966f9b..7af16df 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java @@ -502,6 +502,13 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { throw new IllegalArgumentException(String.format("Cannot convert value: '%s' to type: %s", value, this)); } } + + /** + * Checks whether the data type can be a sorted column. + */ + public boolean canBeASortedColumn() { + return this != BYTES && this != JSON && this != STRUCT && this != MAP && this != LIST; + } } @Override diff --git a/pom.xml b/pom.xml index a116022..d3013da 100644 --- a/pom.xml +++ b/pom.xml @@ -950,6 +950,11 @@ <classifier>hadoop2</classifier> </dependency> <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + <version>1.5.9</version> + </dependency> + <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.13</version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org