Jackie-Jiang commented on a change in pull request #7013:
URL: https://github.com/apache/incubator-pinot/pull/7013#discussion_r645160191



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
##########
@@ -40,60 +41,39 @@
  * A Collector implementation for collecting and concatenating all incoming 
rows.
  */
 public class ConcatCollector implements Collector {
-  private static final String RECORD_OFFSET_FILE_NAME = "record.offset";
-  private static final String RECORD_DATA_FILE_NAME = "record.data";
-
-  private final List<FieldSpec> _fieldSpecs = new ArrayList<>();
   private final int _numSortColumns;
   private final SortOrderComparator _sortOrderComparator;
   private final File _workingDir;
-  private final File _recordOffsetFile;
-  private final File _recordDataFile;
+  private final GenericRowFileManager _recordFileManager;
 
   private GenericRowFileWriter _recordFileWriter;
   private GenericRowFileReader _recordFileReader;
   private int _numDocs;
 
   public ConcatCollector(CollectorConfig collectorConfig, Schema schema) {
     List<String> sortOrder = collectorConfig.getSortOrder();
+    List<FieldSpec> fieldSpecs;
     if (CollectionUtils.isNotEmpty(sortOrder)) {
+      fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
       _numSortColumns = sortOrder.size();
-      DataType[] sortColumnStoredTypes = new DataType[_numSortColumns];
-      for (int i = 0; i < _numSortColumns; i++) {
-        String sortColumn = sortOrder.get(i);
-        FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
-        Preconditions.checkArgument(fieldSpec != null, "Failed to find sort 
column: %s", sortColumn);
-        Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot 
sort on MV column: %s", sortColumn);
-        sortColumnStoredTypes[i] = fieldSpec.getDataType().getStoredType();
-        _fieldSpecs.add(fieldSpec);
-      }
-      _sortOrderComparator = new SortOrderComparator(_numSortColumns, 
sortColumnStoredTypes);
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-        if (!fieldSpec.isVirtualColumn() && 
!sortOrder.contains(fieldSpec.getName())) {
-          _fieldSpecs.add(fieldSpec);
-        }
-      }
+      _sortOrderComparator = 
SegmentProcessingUtils.getSortOrderComparator(fieldSpecs, _numSortColumns);
     } else {
+      fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
       _numSortColumns = 0;
       _sortOrderComparator = null;
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-        if (!fieldSpec.isVirtualColumn()) {
-          _fieldSpecs.add(fieldSpec);
-        }
-      }
     }
 
     _workingDir =
         new File(FileUtils.getTempDirectory(), 
String.format("concat_collector_%d", System.currentTimeMillis()));
     Preconditions.checkState(_workingDir.mkdirs(), "Failed to create dir: %s 
for %s with config: %s",
         _workingDir.getAbsolutePath(), ConcatCollector.class.getSimpleName(), 
collectorConfig);
-    _recordOffsetFile = new File(_workingDir, RECORD_OFFSET_FILE_NAME);
-    _recordDataFile = new File(_workingDir, RECORD_DATA_FILE_NAME);
 
+    // TODO: Pass 'includeNullFields' from the config

Review comment:
       If the table config does not have null value handling enabled, we don't 
need to include null fields

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -55,98 +59,113 @@
  * - partitioning
  */
 public class SegmentMapper {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMapper.class);
-  private final File _inputSegment;
+
+  private final List<RecordReader> _recordReaders;
   private final File _mapperOutputDir;
 
-  private final String _mapperId;
-  private final Schema _avroSchema;
-  private final RecordTransformer _recordTransformer;
+  private final List<FieldSpec> _fieldSpecs;
+  private final boolean _includeNullFields;
+
+  // TODO: Merge the following transformers into one. Currently we need an 
extra DataTypeTransformer in the end in case
+  //       _recordTransformer changes the data type.
+  private final CompositeTransformer _defaultRecordTransformer;
   private final RecordFilter _recordFilter;
-  private final int _numPartitioners;
+  private final RecordTransformer _recordTransformer;
+  private final DataTypeTransformer _dataTypeTransformer;
+
   private final List<Partitioner> _partitioners = new ArrayList<>();
-  private final Map<String, DataFileWriter<GenericData.Record>> 
_partitionToDataFileWriterMap = new HashMap<>();
+  private final Map<String, GenericRowFileManager> _partitionToFileManagerMap 
= new HashMap<>();
 
-  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig 
mapperConfig, File mapperOutputDir) {
-    _inputSegment = inputSegment;
+  public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig 
mapperConfig, File mapperOutputDir) {

Review comment:
       We can still have multiple mappers running in parallel if required.
   The benefit of running single mapper is to produce one file per partition, 
which can simplify the reducer handling, and save the step of merging multiple 
files into one.
   Since all the mappers are running in the same host and doing mostly IO 
operations, running them in parallel might not improve the performance.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -55,98 +59,113 @@
  * - partitioning
  */
 public class SegmentMapper {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMapper.class);
-  private final File _inputSegment;
+
+  private final List<RecordReader> _recordReaders;
   private final File _mapperOutputDir;
 
-  private final String _mapperId;
-  private final Schema _avroSchema;
-  private final RecordTransformer _recordTransformer;
+  private final List<FieldSpec> _fieldSpecs;
+  private final boolean _includeNullFields;
+
+  // TODO: Merge the following transformers into one. Currently we need an 
extra DataTypeTransformer in the end in case
+  //       _recordTransformer changes the data type.
+  private final CompositeTransformer _defaultRecordTransformer;
   private final RecordFilter _recordFilter;
-  private final int _numPartitioners;
+  private final RecordTransformer _recordTransformer;
+  private final DataTypeTransformer _dataTypeTransformer;
+
   private final List<Partitioner> _partitioners = new ArrayList<>();
-  private final Map<String, DataFileWriter<GenericData.Record>> 
_partitionToDataFileWriterMap = new HashMap<>();
+  private final Map<String, GenericRowFileManager> _partitionToFileManagerMap 
= new HashMap<>();
 
-  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig 
mapperConfig, File mapperOutputDir) {
-    _inputSegment = inputSegment;
+  public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig 
mapperConfig, File mapperOutputDir) {

Review comment:
       Added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to