This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch support-orc-format-preprocessing in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 83609aaccb1929b5aaea15b07b500fed501e6f62 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Tue Jun 15 10:02:47 2021 -0700 Support data preprocessing for AVRO and ORC formats --- .../v0_deprecated/pinot-hadoop/pom.xml | 4 + .../hadoop/job/HadoopSegmentPreprocessingJob.java | 387 +++++++++++++-------- .../pinot/hadoop/job/InternalConfigConstants.java | 10 +- .../job/mappers/AvroDataPreprocessingMapper.java | 85 +++++ .../job/mappers/OrcDataPreprocessingMapper.java | 87 +++++ .../job/mappers/SegmentPreprocessingMapper.java | 53 ++- .../AvroDataPreprocessingPartitioner.java | 77 ++++ .../OrcDataPreprocessingPartitioner.java | 83 +++++ ...ucer.java => AvroDataPreprocessingReducer.java} | 45 +-- ...ducer.java => OrcDataPreprocessingReducer.java} | 57 +-- .../hadoop/utils/preprocess/DataFileUtils.java | 62 ++++ .../utils/preprocess/DataPreprocessingUtils.java | 76 ++++ .../pinot/hadoop/utils/preprocess/HadoopUtils.java | 41 +++ .../pinot/hadoop/utils/preprocess/OrcUtils.java | 88 +++++ .../hadoop/utils/preprocess/RawDataFormat.java | 26 ++ .../hadoop/utils/preprocess/TextComparator.java | 41 +++ pom.xml | 5 + 17 files changed, 1015 insertions(+), 212 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml index a0ea8ec..b8218e2 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml @@ -193,6 +193,10 @@ <classifier>hadoop2</classifier> </dependency> <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + </dependency> + <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> </dependency> diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java index b4e87fc..2bf603a 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java @@ -21,7 +21,6 @@ package org.apache.pinot.hadoop.job; import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -31,15 +30,20 @@ import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapreduce.AvroJob; +import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.avro.mapreduce.AvroKeyOutputFormat; import org.apache.avro.mapreduce.AvroMultipleOutputs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -48,11 +52,25 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat; -import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.apache.orc.mapreduce.OrcInputFormat; +import org.apache.orc.mapreduce.OrcOutputFormat; +import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper; +import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper; +import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner; import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner; -import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer; +import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner; +import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer; +import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer; import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper; +import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils; +import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.hadoop.utils.preprocess.RawDataFormat; +import org.apache.pinot.hadoop.utils.preprocess.TextComparator; import org.apache.pinot.ingestion.common.ControllerRestApi; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob; @@ -65,28 +83,36 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableCustomConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format. + * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in either Avro or Orc format. * Thus, the output files are partitioned, sorted, resized after this job. * In order to run this job, the following configs need to be specified in job properties: * * enable.preprocessing: false by default. Enables preprocessing job. */ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { - private static final Logger _logger = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class); - protected FileSystem _fileSystem; + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class); + private String _partitionColumn; private int _numPartitions; private String _partitionFunction; - private String _sortedColumn; + + private String _sortingColumn; + private FieldSpec.DataType _sortingColumnType; + private int _numOutputFiles; + private int _maxNumRecordsPerFile; + private TableConfig _tableConfig; private org.apache.pinot.spi.data.Schema _pinotTableSchema; + private Set<String> _preprocessingOperations; + public HadoopSegmentPreprocessingJob(final Properties properties) { super(properties); } @@ -94,56 +120,102 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { public void run() throws Exception { if (!_enablePreprocessing) { - _logger.info("Pre-processing job is disabled."); + LOGGER.info("Pre-processing job is disabled."); return; } else { - _logger.info("Starting {}", getClass().getSimpleName()); + LOGGER.info("Starting {}", getClass().getSimpleName()); } - _fileSystem = FileSystem.get(_inputSegmentDir.toUri(), getConf()); - final List<Path> inputDataPaths = getDataFilePaths(_inputSegmentDir); - Preconditions.checkState(inputDataPaths.size() != 0, "No files in the input directory."); - - if (_fileSystem.exists(_preprocessedOutputDir)) { - _logger.warn("Found output folder {}, deleting", _preprocessedOutputDir); - _fileSystem.delete(_preprocessedOutputDir, true); - } setTableConfigAndSchema(); - - _logger.info("Initializing a pre-processing job"); - Job job = Job.getInstance(getConf()); - - Path sampleAvroPath = inputDataPaths.get(0); - int numInputPaths = inputDataPaths.size(); - - setValidationConfigs(job, sampleAvroPath); - setHadoopJobConfigs(job, numInputPaths); - - // Avro Schema configs. - Schema avroSchema = getSchema(sampleAvroPath); - _logger.info("Schema is: {}", avroSchema.toString(true)); - setSchemaParams(job, avroSchema); - - // Set up mapper output key - Set<Schema.Field> fieldSet = new HashSet<>(); - + fetchPreProcessingOperations(); fetchPartitioningConfig(); fetchSortingConfig(); fetchResizingConfig(); - validateConfigsAgainstSchema(avroSchema); - // Partition configs. + // Cleans up preprocessed output dir if exists + cleanUpPreprocessedOutputs(_preprocessedOutputDir); + + final List<Path> avroFiles = DataFileUtils.getDataFiles(_inputSegmentDir, DataFileUtils.AVRO_FILE_EXTENSION); + final List<Path> orcFiles = DataFileUtils.getDataFiles(_inputSegmentDir, DataFileUtils.ORC_FILE_EXTENSION); + + int numAvroFiles = avroFiles.size(); + int numOrcFiles = orcFiles.size(); + Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0, + "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles, + _inputSegmentDir); + Preconditions + .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s", + _inputSegmentDir); + List<Path> inputDataPaths; + RawDataFormat rawDataFormat; + if (numAvroFiles > 0) { + rawDataFormat = RawDataFormat.AVRO; + inputDataPaths = avroFiles; + LOGGER.info("Find AVRO files: {} in directories: {}", avroFiles, _inputSegmentDir); + } else { + rawDataFormat = RawDataFormat.ORC; + inputDataPaths = orcFiles; + LOGGER.info("Find ORC files: {} in directories: {}", orcFiles, _inputSegmentDir); + } + + LOGGER.info("Initializing a pre-processing job"); + Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION); + Configuration jobConf = job.getConfiguration(); + // Input and output paths. + Path sampleRawDataPath = inputDataPaths.get(0); + int numInputPaths = inputDataPaths.size(); + setValidationConfigs(job, sampleRawDataPath); + for (Path inputFile : inputDataPaths) { + FileInputFormat.addInputPath(job, inputFile); + } + setHadoopJobConfigs(job); + + // Sorting column + if (_sortingColumn != null) { + LOGGER.info("Adding sorting column: {} to job config", _sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name()); + + switch (_sortingColumnType) { + case INT: + job.setMapOutputKeyClass(IntWritable.class); + break; + case LONG: + job.setMapOutputKeyClass(LongWritable.class); + break; + case FLOAT: + job.setMapOutputKeyClass(FloatWritable.class); + break; + case DOUBLE: + job.setMapOutputKeyClass(DoubleWritable.class); + break; + case STRING: + job.setMapOutputKeyClass(Text.class); + job.setSortComparatorClass(TextComparator.class); + break; + default: + throw new IllegalStateException(); + } + } else { + job.setMapOutputKeyClass(NullWritable.class); + } + + // Partition column int numReduceTasks = 0; if (_partitionColumn != null) { numReduceTasks = _numPartitions; - job.getConfiguration().set(InternalConfigConstants.ENABLE_PARTITIONING, "true"); + jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true"); job.setPartitionerClass(GenericPartitioner.class); - job.getConfiguration().set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn); + jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn); if (_partitionFunction != null) { - job.getConfiguration().set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction); + jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction); + } + jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks); + if (rawDataFormat == RawDataFormat.AVRO) { + job.setPartitionerClass(AvroDataPreprocessingPartitioner.class); + } else { + job.setPartitionerClass(OrcDataPreprocessingPartitioner.class); } - job.getConfiguration().set(InternalConfigConstants.NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks)); - setMaxNumRecordsConfigIfSpecified(job); } else { if (_numOutputFiles > 0) { numReduceTasks = _numOutputFiles; @@ -151,45 +223,65 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { // default number of input paths numReduceTasks = inputDataPaths.size(); } - // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key. - // so that all the rows can be spread evenly. - addHashCodeField(fieldSet); } - job.setInputFormatClass(CombineAvroKeyInputFormat.class); - - _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks); + // Maximum number of records per output file + jobConf + .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile)); + // Number of reducers + LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks); job.setNumReduceTasks(numReduceTasks); - // Sort config. - if (_sortedColumn != null) { - _logger.info("Adding sorted column: {} to job config", _sortedColumn); - job.getConfiguration().set(InternalConfigConstants.SORTED_COLUMN_CONFIG, _sortedColumn); - - addSortedColumnField(avroSchema, fieldSet); + // Mapper and reducer configs. + if (rawDataFormat == RawDataFormat.AVRO) { + Schema avroSchema = getAvroSchema(sampleRawDataPath); + LOGGER.info("Avro schema is: {}", avroSchema.toString(true)); + + // Set up mapper output key + validateConfigsAgainstSchema(avroSchema); + + job.setInputFormatClass(AvroKeyInputFormat.class); + job.setMapperClass(AvroDataPreprocessingMapper.class); + jobConf.setInt(JobContext.NUM_MAPS, numInputPaths); + + job.setReducerClass(AvroDataPreprocessingReducer.class); + AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); + AvroMultipleOutputs.setCountersEnabled(job, true); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); + job.setOutputKeyClass(AvroKey.class); + job.setOutputValueClass(NullWritable.class); + + AvroJob.setInputKeySchema(job, avroSchema); + AvroJob.setMapOutputValueSchema(job, avroSchema); + AvroJob.setOutputKeySchema(job, avroSchema); } else { - // If sorting is disabled, hashcode will be the only factor for sort/group comparator. - addHashCodeField(fieldSet); + String orcSchemaString = getOrcSchemaString(sampleRawDataPath); + LOGGER.info("Orc schema is: {}", orcSchemaString); + + job.setInputFormatClass(OrcInputFormat.class); + job.setMapperClass(OrcDataPreprocessingMapper.class); + job.setMapOutputValueClass(OrcValue.class); + jobConf.setInt(JobContext.NUM_MAPS, numInputPaths); + OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString); + + job.setReducerClass(OrcDataPreprocessingReducer.class); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(OrcStruct.class); + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString); } - // Creates a wrapper for the schema of output key in mapper. - Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false); - mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet)); - _logger.info("Mapper output schema: {}", mapperOutputKeySchema); - - AvroJob.setInputKeySchema(job, avroSchema); - AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema); - AvroJob.setMapOutputValueSchema(job, avroSchema); - AvroJob.setOutputKeySchema(job, avroSchema); - // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is // implemented so that you know what it does. - _logger.info("HDFS class path: " + _pathToDependencyJar); + LOGGER.info("HDFS class path: " + _pathToDependencyJar); if (_pathToDependencyJar != null) { - _logger.info("Copying jars locally."); - PinotHadoopJobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar); + LOGGER.info("Copying jars locally."); + PinotHadoopJobPreparationHelper + .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, job, _pathToDependencyJar); } else { - _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR); + LOGGER.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR); } long startTime = System.currentTimeMillis(); @@ -199,11 +291,31 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { throw new RuntimeException("Job failed : " + job); } - _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime)); + LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime)); + } + + private void fetchPreProcessingOperations() { + _preprocessingOperations = new HashSet<>(); + TableCustomConfig customConfig = _tableConfig.getCustomConfig(); + if (customConfig != null) { + Map<String, String> customConfigMap = customConfig.getCustomConfigs(); + if (customConfigMap != null && !customConfigMap.isEmpty()) { + String preprocessingOperationsString = + customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, ""); + String[] preprocessingOpsArray = preprocessingOperationsString.split(","); + for (String preprocessingOps : preprocessingOpsArray) { + _preprocessingOperations.add(preprocessingOps.trim().toLowerCase()); + } + } + } } private void fetchPartitioningConfig() { // Fetch partition info from table config. + if (!_preprocessingOperations.contains("partition")) { + LOGGER.info("Partitioning is disabled."); + return; + } SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig(); if (segmentPartitionConfig != null) { Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); @@ -215,55 +327,63 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn); } } else { - _logger.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); + LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); } } private void fetchSortingConfig() { + if (!_preprocessingOperations.contains("sort")) { + LOGGER.info("Sorting is disabled."); + return; + } // Fetch sorting info from table config. IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); List<String> sortedColumns = indexingConfig.getSortedColumn(); if (sortedColumns != null) { Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table."); if (sortedColumns.size() == 1) { - _sortedColumn = sortedColumns.get(0); + _sortingColumn = sortedColumns.get(0); + FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn); + Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn); + Preconditions + .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn); + _sortingColumnType = fieldSpec.getDataType(); + Preconditions.checkState(_sortingColumnType != FieldSpec.DataType.BYTES, "Cannot sort on BYTES column: %s", + _sortingColumn); + LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType); } } } private void fetchResizingConfig() { + if (!_preprocessingOperations.contains("resize")) { + LOGGER.info("Resizing is disabled."); + return; + } TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); if (tableCustomConfig == null) { _numOutputFiles = 0; return; } Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs(); - if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESS_NUM_FILES)) { - _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESS_NUM_FILES)); + if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) { + _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)); Preconditions.checkState(_numOutputFiles > 0, String - .format("The value of %s should be positive! Current value: %s", InternalConfigConstants.PREPROCESS_NUM_FILES, - _numOutputFiles)); + .format("The value of %s should be positive! Current value: %s", + InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles)); } else { _numOutputFiles = 0; } - } - private void setMaxNumRecordsConfigIfSpecified(Job job) { - TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); - if (tableCustomConfig == null) { - return; - } - Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs(); if (customConfigsMap != null && customConfigsMap - .containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) { + .containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) { int maxNumRecords = - Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)); + Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)); Preconditions.checkArgument(maxNumRecords > 0, - "The value of " + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE + "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE + " should be positive. Current value: " + maxNumRecords); - _logger.info("Setting {} to {}", InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, maxNumRecords); - job.getConfiguration() - .set(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, Integer.toString(maxNumRecords)); + LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords); + _maxNumRecordsPerFile = maxNumRecords; } } @@ -273,13 +393,12 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { * @return Input schema * @throws IOException exception when accessing to IO */ - private Schema getSchema(Path inputPathDir) + private Schema getAvroSchema(Path inputPathDir) throws IOException { - FileSystem fs = FileSystem.get(new Configuration()); Schema avroSchema = null; - for (FileStatus fileStatus : fs.listStatus(inputPathDir)) { + for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) { if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) { - _logger.info("Extracting schema from " + fileStatus.getPath()); + LOGGER.info("Extracting schema from " + fileStatus.getPath()); try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) { avroSchema = dataStreamReader.getSchema(); } @@ -289,19 +408,17 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { return avroSchema; } - private void addSortedColumnField(Schema schema, Set<Schema.Field> fieldSet) { - // Sorting is enabled. Adding sorted column value to mapper output key. - Schema sortedColumnSchema = schema.getField(_sortedColumn).schema(); - Schema sortedColumnAsKeySchema; - if (sortedColumnSchema.getType().equals(Schema.Type.UNION)) { - sortedColumnAsKeySchema = Schema.createUnion(sortedColumnSchema.getTypes()); - } else if (sortedColumnSchema.getType().equals(Schema.Type.ARRAY)) { - sortedColumnAsKeySchema = Schema.createArray(sortedColumnSchema.getElementType()); - } else { - sortedColumnAsKeySchema = Schema.create(sortedColumnSchema.getType()); + /** + * Finds the orc file and return its orc schema. + */ + private String getOrcSchemaString(Path orcFile) { + String orcSchemaString; + try (Reader reader = OrcFile.createReader(orcFile, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) { + orcSchemaString = reader.getSchema().toString(); + } catch (Exception e) { + throw new IllegalStateException("Caught exception while extracting ORC schema from file: " + orcFile, e); } - Schema.Field columnField = new Schema.Field(_sortedColumn, sortedColumnAsKeySchema, "sortedColumn", null); - fieldSet.add(columnField); + return orcSchemaString; } private void validateConfigsAgainstSchema(Schema schema) { @@ -314,22 +431,17 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { try { PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction); } catch (IllegalArgumentException e) { - _logger.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}", + LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}", _partitionColumn); throw new IllegalArgumentException(e); } } - if (_sortedColumn != null) { - Preconditions.checkArgument(schema.getField(_sortedColumn) != null, - String.format("Sorted column: %s is not found from the schema of input files.", _sortedColumn)); + if (_sortingColumn != null) { + Preconditions.checkArgument(schema.getField(_sortingColumn) != null, + String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn)); } } - private void addHashCodeField(Set<Schema.Field> fieldSet) { - Schema.Field hashCodeField = new Schema.Field("hashcode", Schema.create(Schema.Type.INT), "hashcode", null); - fieldSet.add(hashCodeField); - } - @Override protected org.apache.pinot.spi.data.Schema getSchema() throws IOException { @@ -376,12 +488,10 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName); if (dateTimeFieldSpec != null) { DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); - job.getConfiguration() - .set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); job.getConfiguration() .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString()); - job.getConfiguration() - .set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); } } job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY, @@ -393,41 +503,30 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { } } - private void setHadoopJobConfigs(Job job, int numInputPaths) { + private void setHadoopJobConfigs(Job job) { + job.setJarByClass(HadoopSegmentPreprocessingJob.class); + job.setJobName(getClass().getName()); + FileOutputFormat.setOutputPath(job, _preprocessedOutputDir); job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName()); // Turn this on to always firstly use class paths that user specifies. job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true"); // Turn this off since we don't need an empty file in the output directory job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false"); - job.setJarByClass(HadoopSegmentPreprocessingJob.class); - String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); if (hadoopTokenFileLocation != null) { job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation); } - - // Mapper configs. - job.setMapperClass(SegmentPreprocessingMapper.class); - job.setMapOutputKeyClass(AvroKey.class); - job.setMapOutputValueClass(AvroValue.class); - job.getConfiguration().setInt(JobContext.NUM_MAPS, numInputPaths); - - // Reducer configs. - job.setReducerClass(SegmentPreprocessingReducer.class); - job.setOutputKeyClass(AvroKey.class); - job.setOutputValueClass(NullWritable.class); } - private void setSchemaParams(Job job, Schema avroSchema) + /** + * Cleans up outputs in preprocessed output directory. + */ + public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir) throws IOException { - AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); - AvroMultipleOutputs.setCountersEnabled(job, true); - // Use LazyOutputFormat to avoid creating empty files. - LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); - - // Input and output paths. - FileInputFormat.setInputPaths(job, _inputSegmentDir); - FileOutputFormat.setOutputPath(job, _preprocessedOutputDir); + if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) { + LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir); + HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true); + } } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java index 3a7938d..43009e8 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java @@ -31,19 +31,23 @@ public class InternalConfigConstants { public static final String SEGMENT_TIME_FORMAT = "segment.time.format"; public static final String SEGMENT_TIME_SDF_PATTERN = "segment.time.sdf.pattern"; + // The operations of preprocessing that is enabled. + public static final String PREPROCESS_OPERATIONS = "preprocessing.operations"; + // Partitioning configs public static final String PARTITION_COLUMN_CONFIG = "partition.column"; public static final String NUM_PARTITIONS_CONFIG = "num.partitions"; public static final String PARTITION_FUNCTION_CONFIG = "partition.function"; - public static final String SORTED_COLUMN_CONFIG = "sorted.column"; + public static final String SORTING_COLUMN_CONFIG = "sorting.column"; + public static final String SORTING_COLUMN_TYPE = "sorting.type"; public static final String ENABLE_PARTITIONING = "enable.partitioning"; // max records per file in each partition. No effect otherwise. - public static final String PARTITION_MAX_RECORDS_PER_FILE = "partition.max.records.per.file"; + public static final String PREPROCESSING_MAX_NUM_RECORDS_PER_FILE = "preprocessing.max.num.records.per.file"; // Number of segments we want generated. - public static final String PREPROCESS_NUM_FILES = "preprocess.num.files"; + public static final String PREPROCESSING_NUM_REDUCERS = "preprocessing.num.reducers"; public static final String FAIL_ON_SCHEMA_MISMATCH = "fail.on.schema.mismatch"; } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java new file mode 100644 index 0000000..6278e8e --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.mappers; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapred.AvroValue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; +import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; +import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AvroDataPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, WritableComparable, AvroValue<GenericRecord>> { + private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingMapper.class); + + private String _sortingColumn = null; + private FieldSpec.DataType _sortingColumnType = null; + private AvroRecordExtractor _avroRecordExtractor; + + @Override + public void setup(Context context) { + Configuration configuration = context.getConfiguration(); + _avroRecordExtractor = new AvroRecordExtractor(); + String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG); + if (sortingColumnConfig != null) { + _sortingColumn = sortingColumnConfig; + _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE)); + LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn, + _sortingColumnType); + } else { + LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column"); + } + } + + @Override + public void map(AvroKey<GenericRecord> key, NullWritable value, Context context) + throws IOException, InterruptedException { + GenericRecord record = key.datum(); + if (_sortingColumn != null) { + Object object = record.get(_sortingColumn); + Preconditions + .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn, + record); + Object convertedValue = _avroRecordExtractor.convert(object); + Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object, + _sortingColumn, record); + WritableComparable outputKey; + try { + outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType); + } catch (Exception e) { + throw new IllegalStateException( + String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record), + e); + } + context.write(outputKey, new AvroValue<>(record)); + } else { + context.write(NullWritable.get(), new AvroValue<>(record)); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java new file mode 100644 index 0000000..d7d0694 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.mappers; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; +import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct, WritableComparable, OrcValue> { + private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingMapper.class); + + private final OrcValue _valueWrapper = new OrcValue(); + private String _sortingColumn = null; + private FieldSpec.DataType _sortingColumnType = null; + private int _sortingColumnId = -1; + + @Override + public void setup(Context context) { + Configuration configuration = context.getConfiguration(); + String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG); + if (sortingColumnConfig != null) { + _sortingColumn = sortingColumnConfig; + _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE)); + LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn, + _sortingColumnType); + } else { + LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column"); + } + } + + @Override + public void map(NullWritable key, OrcStruct value, Context context) + throws IOException, InterruptedException { + _valueWrapper.value = value; + if (_sortingColumn != null) { + if (_sortingColumnId == -1) { + List<String> fieldNames = value.getSchema().getFieldNames(); + _sortingColumnId = fieldNames.indexOf(_sortingColumn); + Preconditions.checkState(_sortingColumnId != -1, "Failed to find sorting column: %s in the ORC fields: %s", + _sortingColumn, fieldNames); + LOGGER.info("Field id for sorting column: {} is: {}", _sortingColumn, _sortingColumnId); + } + WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId); + WritableComparable outputKey; + try { + outputKey = DataPreprocessingUtils + .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType); + } catch (Exception e) { + throw new IllegalStateException(String + .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn, + _sortingColumnId, value), e); + } + context.write(outputKey, _valueWrapper); + } else { + context.write(NullWritable.get(), _valueWrapper); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java index e3f088d..6decee1 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java @@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory; public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class); + private Configuration _jobConf; private String _sortedColumn = null; private String _timeColumn = null; private Schema _outputKeySchema; @@ -52,43 +54,42 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N @Override public void setup(final Context context) { - Configuration configuration = context.getConfiguration(); - - String tableName = configuration.get(JobConfigConstants.SEGMENT_TABLE_NAME); - - _isAppend = configuration.get(InternalConfigConstants.IS_APPEND).equalsIgnoreCase("true"); + _jobConf = context.getConfiguration(); + logConfigurations(); + String tableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME); + _isAppend = "true".equalsIgnoreCase(_jobConf.get(InternalConfigConstants.IS_APPEND)); if (_isAppend) { // Get time column name - _timeColumn = configuration.get(InternalConfigConstants.TIME_COLUMN_CONFIG); + _timeColumn = _jobConf.get(InternalConfigConstants.TIME_COLUMN_CONFIG); // Get sample time column value - String timeColumnValue = configuration.get(InternalConfigConstants.TIME_COLUMN_VALUE); - String pushFrequency = configuration.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY); + String timeColumnValue = _jobConf.get(InternalConfigConstants.TIME_COLUMN_VALUE); + String pushFrequency = _jobConf.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY); - String timeType = configuration.get(InternalConfigConstants.SEGMENT_TIME_TYPE); - String timeFormat = configuration.get(InternalConfigConstants.SEGMENT_TIME_FORMAT); + String timeType = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_TYPE); + String timeFormat = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_FORMAT); DateTimeFormatSpec dateTimeFormatSpec; if (timeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())) { dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat); } else { dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat, - configuration.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN)); + _jobConf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN)); } _normalizedDateSegmentNameGenerator = new NormalizedDateSegmentNameGenerator(tableName, null, false, "APPEND", pushFrequency, dateTimeFormatSpec); _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue); } - String sortedColumn = configuration.get(InternalConfigConstants.SORTED_COLUMN_CONFIG); + String sortedColumn = _jobConf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG); // Logging the configs for the mapper LOGGER.info("Sorted Column: " + sortedColumn); if (sortedColumn != null) { _sortedColumn = sortedColumn; } - _outputKeySchema = AvroJob.getMapOutputKeySchema(configuration); - _outputSchema = AvroJob.getMapOutputValueSchema(configuration); - _enablePartitioning = Boolean.parseBoolean(configuration.get(InternalConfigConstants.ENABLE_PARTITIONING, "false")); + _outputKeySchema = AvroJob.getMapOutputKeySchema(_jobConf); + _outputSchema = AvroJob.getMapOutputValueSchema(_jobConf); + _enablePartitioning = Boolean.parseBoolean(_jobConf.get(InternalConfigConstants.ENABLE_PARTITIONING, "false")); } @Override @@ -130,4 +131,26 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N throw e; } } + + protected void logConfigurations() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append('{'); + boolean firstEntry = true; + for (Map.Entry<String, String> entry : _jobConf) { + if (!firstEntry) { + stringBuilder.append(", "); + } else { + firstEntry = false; + } + + stringBuilder.append(entry.getKey()); + stringBuilder.append('='); + stringBuilder.append(entry.getValue()); + } + stringBuilder.append('}'); + + LOGGER.info("*********************************************************************"); + LOGGER.info("Job Configurations: {}", stringBuilder.toString()); + LOGGER.info("*********************************************************************"); + } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java new file mode 100644 index 0000000..74799c7 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.partitioners; + +import com.google.common.base.Preconditions; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroValue; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AvroDataPreprocessingPartitioner extends Partitioner<WritableComparable, AvroValue<GenericRecord>> implements Configurable { + private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingPartitioner.class); + + private Configuration _conf; + private String _partitionColumn; + private PartitionFunction _partitionFunction; + private AvroRecordExtractor _avroRecordExtractor; + + @Override + public void setConf(Configuration conf) { + _conf = conf; + _avroRecordExtractor = new AvroRecordExtractor(); + _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG); + String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG); + int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG)); + _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions); + LOGGER.info( + "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}", + _partitionColumn, partitionFunctionName, numPartitions); + } + + @Override + public Configuration getConf() { + return _conf; + } + + @Override + public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) { + GenericRecord record = value.datum(); + Object object = record.get(_partitionColumn); + Preconditions + .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn, + record); + Object convertedValue = _avroRecordExtractor.convert(object); + Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object, + _partitionColumn, record); + Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String, + "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn, + convertedValue.getClass(), record); + // NOTE: Always partition with String type value because Broker uses String type value to prune segments + return _partitionFunction.getPartition(convertedValue.toString()); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java new file mode 100644 index 0000000..bef2cef --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.partitioners; + +import com.google.common.base.Preconditions; +import java.util.List; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class OrcDataPreprocessingPartitioner extends Partitioner<WritableComparable, OrcValue> implements Configurable { + private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingPartitioner.class); + + private Configuration _conf; + private String _partitionColumn; + private PartitionFunction _partitionFunction; + private int _partitionColumnId = -1; + + @Override + public void setConf(Configuration conf) { + _conf = conf; + _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG); + String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG); + int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG)); + _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions); + LOGGER.info( + "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}", + _partitionColumn, partitionFunctionName, numPartitions); + } + + @Override + public Configuration getConf() { + return _conf; + } + + @Override + public int getPartition(WritableComparable key, OrcValue value, int numPartitions) { + OrcStruct orcStruct = (OrcStruct) value.value; + if (_partitionColumnId == -1) { + List<String> fieldNames = orcStruct.getSchema().getFieldNames(); + _partitionColumnId = fieldNames.indexOf(_partitionColumn); + Preconditions.checkState(_partitionColumnId != -1, "Failed to find partition column: %s in the ORC fields: %s", + _partitionColumn, fieldNames); + LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId); + } + WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId); + Object convertedValue; + try { + convertedValue = OrcUtils.convert(partitionColumnValue); + } catch (Exception e) { + throw new IllegalStateException(String + .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s", _partitionColumn, + _partitionColumnId, orcStruct), e); + } + // NOTE: Always partition with String type value because Broker uses String type value to prune segments + return _partitionFunction.getPartition(convertedValue.toString()); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java similarity index 63% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java index 9c07e6f..5fcbf10 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java @@ -19,7 +19,6 @@ package org.apache.pinot.hadoop.job.reducers; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; @@ -33,33 +32,43 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class); +public class AvroDataPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> { + private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingReducer.class); private AvroMultipleOutputs _multipleOutputs; - private AtomicInteger _counter; - private int _maxNumberOfRecords; + private long _numRecords; + private int _maxNumRecordsPerFile; private String _filePrefix; @Override public void setup(Context context) { - LOGGER.info("Using multiple outputs strategy."); Configuration configuration = context.getConfiguration(); - _multipleOutputs = new AvroMultipleOutputs(context); - _counter = new AtomicInteger(); // If it's 0, the output file won't be split into multiple files. // If not, output file will be split when the number of records reaches this number. - _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0); - LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords); - _filePrefix = RandomStringUtils.randomAlphanumeric(4); + _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0); + if (_maxNumRecordsPerFile > 0) { + LOGGER.info("Using multiple outputs strategy."); + _multipleOutputs = new AvroMultipleOutputs(context); + _numRecords = 0L; + _filePrefix = RandomStringUtils.randomAlphanumeric(4); + LOGGER.info("Initialized AvroDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile); + } else { + LOGGER.info("Initialized AvroDataPreprocessingReducer without limit on maxNumRecordsPerFile"); + } } @Override public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context) throws IOException, InterruptedException { - for (final AvroValue<GenericRecord> value : values) { - String fileName = generateFileName(); - _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName); + if (_maxNumRecordsPerFile > 0) { + for (final AvroValue<GenericRecord> value : values) { + String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile); + _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName); + } + } else { + for (final AvroValue<GenericRecord> value : values) { + context.write(new AvroKey<>(value.datum()), NullWritable.get()); + } } } @@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic } LOGGER.info("Finished cleaning up reducer."); } - - private String generateFileName() { - if (_maxNumberOfRecords == 0) { - return _filePrefix; - } else { - return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords); - } - } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java similarity index 54% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java index 9c07e6f..a3387a2 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java @@ -19,47 +19,56 @@ package org.apache.pinot.hadoop.job.reducers; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapred.AvroValue; -import org.apache.avro.mapreduce.AvroMultipleOutputs; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; import org.apache.pinot.hadoop.job.InternalConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class); +public class OrcDataPreprocessingReducer extends Reducer<WritableComparable, OrcValue, NullWritable, OrcStruct> { + private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingReducer.class); - private AvroMultipleOutputs _multipleOutputs; - private AtomicInteger _counter; - private int _maxNumberOfRecords; + private int _maxNumRecordsPerFile; + private MultipleOutputs<NullWritable, OrcStruct> _multipleOutputs; + private long _numRecords; private String _filePrefix; @Override public void setup(Context context) { - LOGGER.info("Using multiple outputs strategy."); Configuration configuration = context.getConfiguration(); - _multipleOutputs = new AvroMultipleOutputs(context); - _counter = new AtomicInteger(); // If it's 0, the output file won't be split into multiple files. // If not, output file will be split when the number of records reaches this number. - _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0); - LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords); - _filePrefix = RandomStringUtils.randomAlphanumeric(4); + _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0); + if (_maxNumRecordsPerFile > 0) { + LOGGER.info("Using multiple outputs strategy."); + _multipleOutputs = new MultipleOutputs<>(context); + _numRecords = 0L; + _filePrefix = RandomStringUtils.randomAlphanumeric(4); + LOGGER.info("Initialized OrcDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile); + } else { + LOGGER.info("Initialized OrcDataPreprocessingReducer without limit on maxNumRecordsPerFile"); + } } @Override - public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context) + public void reduce(WritableComparable key, Iterable<OrcValue> values, Context context) throws IOException, InterruptedException { - for (final AvroValue<GenericRecord> value : values) { - String fileName = generateFileName(); - _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName); + if (_maxNumRecordsPerFile > 0) { + for (final OrcValue value : values) { + String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile); + _multipleOutputs.write(NullWritable.get(), (OrcStruct) value.value, fileName); + } + } else { + for (final OrcValue value : values) { + context.write(NullWritable.get(), (OrcStruct) value.value); + } } } @@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic } LOGGER.info("Finished cleaning up reducer."); } - - private String generateFileName() { - if (_maxNumberOfRecords == 0) { - return _filePrefix; - } else { - return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords); - } - } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java new file mode 100644 index 0000000..58e1c1d --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + + +public class DataFileUtils { + private DataFileUtils() { + } + + public static final String AVRO_FILE_EXTENSION = ".avro"; + public static final String ORC_FILE_EXTENSION = ".orc"; + + /** + * Returns the data files under the input directory with the given file extension. + */ + public static List<Path> getDataFiles(Path inputDir, String dataFileExtension) + throws IOException { + FileStatus fileStatus = HadoopUtils.DEFAULT_FILE_SYSTEM.getFileStatus(inputDir); + Preconditions.checkState(fileStatus.isDirectory(), "Path: %s is not a directory", inputDir); + List<Path> dataFiles = new ArrayList<>(); + getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputDir), dataFileExtension, dataFiles); + return dataFiles; + } + + private static void getDataFilesHelper(FileStatus[] fileStatuses, String dataFileExtension, List<Path> dataFiles) + throws IOException { + for (FileStatus fileStatus : fileStatuses) { + Path path = fileStatus.getPath(); + if (fileStatus.isDirectory()) { + getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(path), dataFileExtension, dataFiles); + } else { + Preconditions.checkState(fileStatus.isFile(), "Path: %s is neither a directory nor a file", path); + if (path.getName().endsWith(dataFileExtension)) { + dataFiles.add(path); + } + } + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java new file mode 100644 index 0000000..234e2d3 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.pinot.spi.data.FieldSpec; + + +public class DataPreprocessingUtils { + private DataPreprocessingUtils() { + } + + /** + * Converts a value into {@link WritableComparable} based on the given data type. + * <p>NOTE: The passed in value must be either a Number or a String. + */ + public static WritableComparable convertToWritableComparable(Object value, FieldSpec.DataType dataType) { + if (value instanceof Number) { + Number numberValue = (Number) value; + switch (dataType) { + case INT: + return new IntWritable(numberValue.intValue()); + case LONG: + return new LongWritable(numberValue.longValue()); + case FLOAT: + return new FloatWritable(numberValue.floatValue()); + case DOUBLE: + return new DoubleWritable(numberValue.doubleValue()); + case STRING: + return new Text(numberValue.toString()); + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } else if (value instanceof String) { + String stringValue = (String) value; + switch (dataType) { + case INT: + return new IntWritable(Integer.parseInt(stringValue)); + case LONG: + return new LongWritable(Long.parseLong(stringValue)); + case FLOAT: + return new FloatWritable(Float.parseFloat(stringValue)); + case DOUBLE: + return new DoubleWritable(Double.parseDouble(stringValue)); + case STRING: + return new Text(stringValue); + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } else { + throw new IllegalArgumentException( + String.format("Value: %s must be either a Number or a String, found: %s", value, value.getClass())); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java new file mode 100644 index 0000000..0596259 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + + +public class HadoopUtils { + private HadoopUtils() { + } + + public static final Configuration DEFAULT_CONFIGURATION; + public static final FileSystem DEFAULT_FILE_SYSTEM; + + static { + DEFAULT_CONFIGURATION = new Configuration(); + try { + DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION); + } catch (IOException e) { + throw new IllegalStateException("Failed to get the default file system", e); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java new file mode 100644 index 0000000..dcfc3b5 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.orc.mapred.OrcTimestamp; + + +public class OrcUtils { + private OrcUtils() { + } + + /** + * Converts the ORC value into Number or String. + * <p>The following ORC types are supported: + * <ul> + * <li>IntWritable -> Integer</li> + * <li>LongWritable -> Long</li> + * <li>FloatWritable -> Float</li> + * <li>DoubleWritable -> Double</li> + * <li>Text -> String</li> + * <li>BooleanWritable -> String</li> + * <li>ByteWritable -> Byte</li> + * <li>ShortWritable -> Short</li> + * <li>DateWritable -> Long</li> + * <li>OrcTimestamp -> Long</li> + * </ul> + */ + public static Object convert(WritableComparable orcValue) { + if (orcValue instanceof IntWritable) { + return ((IntWritable) orcValue).get(); + } + if (orcValue instanceof LongWritable) { + return ((LongWritable) orcValue).get(); + } + if (orcValue instanceof FloatWritable) { + return ((FloatWritable) orcValue).get(); + } + if (orcValue instanceof DoubleWritable) { + return ((DoubleWritable) orcValue).get(); + } + if (orcValue instanceof Text) { + return orcValue.toString(); + } + if (orcValue instanceof BooleanWritable) { + return Boolean.toString(((BooleanWritable) orcValue).get()); + } + if (orcValue instanceof ByteWritable) { + return ((ByteWritable) orcValue).get(); + } + if (orcValue instanceof ShortWritable) { + return ((ShortWritable) orcValue).get(); + } + if (orcValue instanceof DateWritable) { + return ((DateWritable) orcValue).get().getTime(); + } + if (orcValue instanceof OrcTimestamp) { + return ((OrcTimestamp) orcValue).getTime(); + } + throw new IllegalArgumentException( + String.format("Illegal ORC value: %s, class: %s", orcValue, orcValue.getClass())); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java new file mode 100644 index 0000000..0ae3ca3 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +/** + * The current supported data formats in Pinot Preprocessing jobs + */ +public enum RawDataFormat { + AVRO, ORC +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java new file mode 100644 index 0000000..65f1222 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.utils.preprocess; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.pinot.spi.utils.StringUtils; + + +/** + * Override the Text comparison logic to compare with the decoded String instead of the byte array. + */ +public class TextComparator extends WritableComparator { + public TextComparator() { + super(Text.class); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int n1 = WritableUtils.decodeVIntSize(b1[s1]); + int n2 = WritableUtils.decodeVIntSize(b2[s2]); + return StringUtils.decodeUtf8(b1, s1 + n1, l1 - n1).compareTo(StringUtils.decodeUtf8(b2, s2 + n2, l2 - n2)); + } +} diff --git a/pom.xml b/pom.xml index aaf122f..7337462 100644 --- a/pom.xml +++ b/pom.xml @@ -944,6 +944,11 @@ <classifier>hadoop2</classifier> </dependency> <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + <version>1.5.9</version> + </dependency> + <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.13</version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org