Jackie-Jiang commented on a change in pull request #7013: URL: https://github.com/apache/incubator-pinot/pull/7013#discussion_r645809231
########## File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java ########## @@ -55,98 +59,113 @@ * - partitioning */ public class SegmentMapper { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class); - private final File _inputSegment; + + private final List<RecordReader> _recordReaders; private final File _mapperOutputDir; - private final String _mapperId; - private final Schema _avroSchema; - private final RecordTransformer _recordTransformer; + private final List<FieldSpec> _fieldSpecs; + private final boolean _includeNullFields; + + // TODO: Merge the following transformers into one. Currently we need an extra DataTypeTransformer in the end in case + // _recordTransformer changes the data type. + private final CompositeTransformer _defaultRecordTransformer; private final RecordFilter _recordFilter; - private final int _numPartitioners; + private final RecordTransformer _recordTransformer; + private final DataTypeTransformer _dataTypeTransformer; + private final List<Partitioner> _partitioners = new ArrayList<>(); - private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>(); + private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new HashMap<>(); - public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) { - _inputSegment = inputSegment; + public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig mapperConfig, File mapperOutputDir) { + _recordReaders = recordReaders; _mapperOutputDir = mapperOutputDir; - _mapperId = mapperId; - _avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema()); + TableConfig tableConfig = mapperConfig.getTableConfig(); + Schema schema = mapperConfig.getSchema(); + List<String> sortOrder = tableConfig.getIndexingConfig().getSortedColumn(); + if (CollectionUtils.isNotEmpty(sortOrder)) { + _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder); + } else { + _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema); + } + _includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled(); + _defaultRecordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema); _recordFilter = RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig()); _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig()); + _dataTypeTransformer = new DataTypeTransformer(schema); for (PartitionerConfig partitionerConfig : mapperConfig.getPartitionerConfigs()) { _partitioners.add(PartitionerFactory.getPartitioner(partitionerConfig)); } - _numPartitioners = _partitioners.size(); LOGGER.info( - "Initialized mapper with id: {}, input segment: {}, output dir: {}, recordTransformer: {}, recordFilter: {}, partitioners: {}", - _mapperId, _inputSegment, _mapperOutputDir, _recordTransformer.getClass(), _recordFilter.getClass(), + "Initialized mapper with {} record readers, output dir: {}, recordTransformer: {}, recordFilter: {}, partitioners: {}", + _recordReaders.size(), _mapperOutputDir, _recordTransformer.getClass(), _recordFilter.getClass(), _partitioners.stream().map(p -> p.getClass().toString()).collect(Collectors.joining(","))); } /** * Reads the input segment and generates partitioned avro data files into the mapper output directory * Records for each partition are put into a directory of its own withing the mapper output directory, identified by the partition name */ - public void map() + public Map<String, GenericRowFileManager> map() throws Exception { - - PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_inputSegment); - GenericRow reusableRow = new GenericRow(); - GenericData.Record reusableRecord = new GenericData.Record(_avroSchema); - String[] partitions = new String[_numPartitioners]; - - while (segmentRecordReader.hasNext()) { - reusableRow = segmentRecordReader.next(reusableRow); - - // Record filtering - if (_recordFilter.filter(reusableRow)) { - continue; - } - - // Record transformation - reusableRow = _recordTransformer.transformRecord(reusableRow); - - // Partitioning - int p = 0; - for (Partitioner partitioner : _partitioners) { - partitions[p++] = partitioner.getPartition(reusableRow); - } - String partition = StringUtil.join("_", partitions); - - // Create writer for the partition, if not exists - DataFileWriter<GenericData.Record> recordWriter = _partitionToDataFileWriterMap.get(partition); - if (recordWriter == null) { - File partDir = new File(_mapperOutputDir, partition); - if (!partDir.exists()) { - Files.createDirectory(Paths.get(partDir.getAbsolutePath())); + GenericRow reuse = new GenericRow(); + for (RecordReader recordReader : _recordReaders) { + while (recordReader.hasNext()) { + reuse = recordReader.next(reuse); + + // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it + + if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) { + //noinspection unchecked + for (GenericRow row : (Collection<GenericRow>) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) { + GenericRow transformedRow = _defaultRecordTransformer.transform(row); + if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow) && !_recordFilter + .filter(transformedRow)) { + writeRecord(transformedRow); + } + } + } else { + GenericRow transformedRow = _defaultRecordTransformer.transform(reuse); + if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow) && !_recordFilter + .filter(transformedRow)) { + writeRecord(transformedRow); + } } - recordWriter = new DataFileWriter<>(new GenericDatumWriter<>(_avroSchema)); - recordWriter.create(_avroSchema, new File(partDir, createMapperOutputFileName(_mapperId))); - _partitionToDataFileWriterMap.put(partition, recordWriter); - } - // Write record to avro file for its partition - SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(reusableRow, reusableRecord); - recordWriter.append(reusableRecord); + reuse.clear(); + } + } - reusableRow.clear(); + for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) { + fileManager.closeFileWriter(); } + + return _partitionToFileManagerMap; } - /** - * Cleanup the mapper state - */ - public void cleanup() + private void writeRecord(GenericRow row) throws IOException { - for (DataFileWriter<GenericData.Record> recordDataFileWriter : _partitionToDataFileWriterMap.values()) { - recordDataFileWriter.close(); + // Record transformation + row = _dataTypeTransformer.transform(_recordTransformer.transformRecord(row)); + + // Partitioning + int numPartitioners = _partitioners.size(); + String[] partitions = new String[numPartitioners]; Review comment: Added reusable `_partitionsBuffer` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org