Jackie-Jiang commented on a change in pull request #7013: URL: https://github.com/apache/incubator-pinot/pull/7013#discussion_r645160191
########## File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java ########## @@ -40,60 +41,39 @@ * A Collector implementation for collecting and concatenating all incoming rows. */ public class ConcatCollector implements Collector { - private static final String RECORD_OFFSET_FILE_NAME = "record.offset"; - private static final String RECORD_DATA_FILE_NAME = "record.data"; - - private final List<FieldSpec> _fieldSpecs = new ArrayList<>(); private final int _numSortColumns; private final SortOrderComparator _sortOrderComparator; private final File _workingDir; - private final File _recordOffsetFile; - private final File _recordDataFile; + private final GenericRowFileManager _recordFileManager; private GenericRowFileWriter _recordFileWriter; private GenericRowFileReader _recordFileReader; private int _numDocs; public ConcatCollector(CollectorConfig collectorConfig, Schema schema) { List<String> sortOrder = collectorConfig.getSortOrder(); + List<FieldSpec> fieldSpecs; if (CollectionUtils.isNotEmpty(sortOrder)) { + fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder); _numSortColumns = sortOrder.size(); - DataType[] sortColumnStoredTypes = new DataType[_numSortColumns]; - for (int i = 0; i < _numSortColumns; i++) { - String sortColumn = sortOrder.get(i); - FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn); - Preconditions.checkArgument(fieldSpec != null, "Failed to find sort column: %s", sortColumn); - Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot sort on MV column: %s", sortColumn); - sortColumnStoredTypes[i] = fieldSpec.getDataType().getStoredType(); - _fieldSpecs.add(fieldSpec); - } - _sortOrderComparator = new SortOrderComparator(_numSortColumns, sortColumnStoredTypes); - for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { - if (!fieldSpec.isVirtualColumn() && !sortOrder.contains(fieldSpec.getName())) { - _fieldSpecs.add(fieldSpec); - } - } + _sortOrderComparator = SegmentProcessingUtils.getSortOrderComparator(fieldSpecs, _numSortColumns); } else { + fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema); _numSortColumns = 0; _sortOrderComparator = null; - for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { - if (!fieldSpec.isVirtualColumn()) { - _fieldSpecs.add(fieldSpec); - } - } } _workingDir = new File(FileUtils.getTempDirectory(), String.format("concat_collector_%d", System.currentTimeMillis())); Preconditions.checkState(_workingDir.mkdirs(), "Failed to create dir: %s for %s with config: %s", _workingDir.getAbsolutePath(), ConcatCollector.class.getSimpleName(), collectorConfig); - _recordOffsetFile = new File(_workingDir, RECORD_OFFSET_FILE_NAME); - _recordDataFile = new File(_workingDir, RECORD_DATA_FILE_NAME); + // TODO: Pass 'includeNullFields' from the config Review comment: If the table config does not have null value handling enabled, we don't need to include null fields ########## File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java ########## @@ -55,98 +59,113 @@ * - partitioning */ public class SegmentMapper { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class); - private final File _inputSegment; + + private final List<RecordReader> _recordReaders; private final File _mapperOutputDir; - private final String _mapperId; - private final Schema _avroSchema; - private final RecordTransformer _recordTransformer; + private final List<FieldSpec> _fieldSpecs; + private final boolean _includeNullFields; + + // TODO: Merge the following transformers into one. Currently we need an extra DataTypeTransformer in the end in case + // _recordTransformer changes the data type. + private final CompositeTransformer _defaultRecordTransformer; private final RecordFilter _recordFilter; - private final int _numPartitioners; + private final RecordTransformer _recordTransformer; + private final DataTypeTransformer _dataTypeTransformer; + private final List<Partitioner> _partitioners = new ArrayList<>(); - private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>(); + private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new HashMap<>(); - public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) { - _inputSegment = inputSegment; + public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig mapperConfig, File mapperOutputDir) { Review comment: We can still have multiple mappers running in parallel if required. The benefit of running single mapper is to produce one file per partition, which can simplify the reducer handling, and save the step of merging multiple files into one. Since all the mappers are running in the same host and doing mostly IO operations, running them in parallel might not improve the performance. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java ########## @@ -55,98 +59,113 @@ * - partitioning */ public class SegmentMapper { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class); - private final File _inputSegment; + + private final List<RecordReader> _recordReaders; private final File _mapperOutputDir; - private final String _mapperId; - private final Schema _avroSchema; - private final RecordTransformer _recordTransformer; + private final List<FieldSpec> _fieldSpecs; + private final boolean _includeNullFields; + + // TODO: Merge the following transformers into one. Currently we need an extra DataTypeTransformer in the end in case + // _recordTransformer changes the data type. + private final CompositeTransformer _defaultRecordTransformer; private final RecordFilter _recordFilter; - private final int _numPartitioners; + private final RecordTransformer _recordTransformer; + private final DataTypeTransformer _dataTypeTransformer; + private final List<Partitioner> _partitioners = new ArrayList<>(); - private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>(); + private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new HashMap<>(); - public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) { - _inputSegment = inputSegment; + public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig mapperConfig, File mapperOutputDir) { Review comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org