This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch segment-creation-in-one-pass in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 7bcaa01d55070b36a1f3219e0cf33de8724e7084 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Fri Jan 22 13:04:21 2021 -0800 Support data ingestion for offline segment in one pass --- .../readers/IntermediateSegmentRecordReader.java | 78 +++++ .../generator/SegmentGeneratorConfig.java | 9 + .../indexsegment/mutable/IntermediateSegment.java | 373 +++++++++++++++++++++ .../RealtimeIndexOffHeapMemoryManager.java | 8 +- ...tatistics.java => MutableColumnStatistics.java} | 4 +- ....java => MutableNoDictionaryColStatistics.java} | 4 +- .../stats/RealtimeSegmentStatsContainer.java | 4 +- ...termediateSegmentSegmentCreationDataSource.java | 56 ++++ .../creator/IntermediateSegmentStatsContainer.java | 52 +++ .../creator/impl/SegmentColumnarIndexCreator.java | 2 +- .../impl/SegmentIndexCreationDriverImpl.java | 11 +- .../core/segment/creator/impl/V1Constants.java | 2 +- .../index/column/IntermediateIndexContainer.java | 134 ++++++++ .../core/segment/index/column/NumValuesInfo.java | 41 +++ .../segment/index/loader/IndexLoadingConfig.java | 3 + .../segment/index/metadata/ColumnMetadata.java | 2 +- .../core/indexsegment/IntermediateSegmentTest.java | 222 ++++++++++++ 17 files changed, 992 insertions(+), 13 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java new file mode 100644 index 0000000..0c2daaf --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.readers; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; + + +public class IntermediateSegmentRecordReader implements RecordReader { + private final IntermediateSegment _intermediateSegment; + private final int _numDocs; + + private int _nextDocId = 0; + + public IntermediateSegmentRecordReader(IntermediateSegment intermediateSegment) { + _intermediateSegment = intermediateSegment; + _numDocs = intermediateSegment.getNumDocsIndexed(); + } + + @Override + public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) + throws IOException { + } + + @Override + public boolean hasNext() { + return _nextDocId < _numDocs; + } + + @Override + public GenericRow next() + throws IOException { + return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) + throws IOException { + return _intermediateSegment.getRecord(_nextDocId++, reuse); + } + + @Override + public void rewind() + throws IOException { + _nextDocId = 0; + } + + @Override + public void close() + throws IOException { + } + + public IntermediateSegment getIntermediateSegment() { + return _intermediateSegment; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java index c630a15..a65c312 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java @@ -102,6 +102,7 @@ public class SegmentGeneratorConfig implements Serializable { private boolean _onHeap = false; private boolean _skipTimeValueCheck = false; private boolean _nullHandlingEnabled = false; + private boolean _isIntermediateSegmentRecordReader = false; // constructed from FieldConfig private Map<String, Map<String, String>> _columnProperties = new HashMap<>(); @@ -650,4 +651,12 @@ public class SegmentGeneratorConfig implements Serializable { public void setNullHandlingEnabled(boolean nullHandlingEnabled) { _nullHandlingEnabled = nullHandlingEnabled; } + + public boolean isIntermediateSegmentRecordReader() { + return _isIntermediateSegmentRecordReader; + } + + public void setIntermediateSegmentRecordReader(boolean intermediateSegmentRecordReader) { + _isIntermediateSegmentRecordReader = intermediateSegmentRecordReader; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java new file mode 100644 index 0000000..056a05b --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java @@ -0,0 +1,373 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.indexsegment.mutable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.data.partition.PartitionFunction; +import org.apache.pinot.core.data.partition.PartitionFunctionFactory; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager; +import org.apache.pinot.core.io.writer.impl.DirectMemoryManager; +import org.apache.pinot.core.io.writer.impl.MmapMemoryManager; +import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory; +import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex; +import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex; +import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer; +import org.apache.pinot.core.segment.index.column.NumValuesInfo; +import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.core.segment.index.metadata.SegmentMetadata; +import org.apache.pinot.core.segment.index.readers.MutableDictionary; +import org.apache.pinot.core.segment.index.readers.MutableForwardIndex; +import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader; +import org.apache.pinot.core.startree.v2.StarTreeV2; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pinot.spi.utils.ByteArray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IntermediateSegment implements MutableSegment { + private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class); + + private static final int MAX_MULTI_VALUES_PER_ROW = 1000; + private static final int DEFAULT_CAPACITY = 100_000; + private static final int DEFAULT_EST_AVG_COL_SIZE = 32; + private static final int DEFAULT_EST_CARDINALITY = 5000; + + private static final String STATS_FILE_NAME = "segment-stats.ser"; + + private final SegmentGeneratorConfig _segmentGeneratorConfig; + private final Schema _schema; + private final TableConfig _tableConfig; + private final String _segmentName; + private final PartitionFunction _partitionFunction; + private final String _partitionColumn; + private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>(); + private final PinotDataBufferMemoryManager _memoryManager; + + private final int _capacity = DEFAULT_CAPACITY; + private volatile int _numDocsIndexed = 0; + + public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) { + _segmentGeneratorConfig = segmentGeneratorConfig; + _schema = segmentGeneratorConfig.getSchema(); + _tableConfig = segmentGeneratorConfig.getTableConfig(); + _segmentName = _segmentGeneratorConfig.getSegmentName(); + + Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs(); + List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size()); + physicalFieldSpecs.addAll(allFieldSpecs); + Collection<FieldSpec> physicalFieldSpecs1 = Collections.unmodifiableCollection(physicalFieldSpecs); + + SegmentPartitionConfig segmentPartitionConfig = segmentGeneratorConfig.getSegmentPartitionConfig(); + if (segmentPartitionConfig != null) { + Map<String, ColumnPartitionConfig> segmentPartitionConfigColumnPartitionMap = + segmentPartitionConfig.getColumnPartitionMap(); + _partitionColumn = segmentPartitionConfigColumnPartitionMap.keySet().iterator().next(); + _partitionFunction = PartitionFunctionFactory + .getPartitionFunction(segmentPartitionConfig.getFunctionName(_partitionColumn), + segmentPartitionConfig.getNumPartitions(_partitionColumn)); + } else { + _partitionColumn = null; + _partitionFunction = null; + } + + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); + boolean offHeap = indexLoadingConfig.isRealtimeOffHeapAllocation(); + boolean directOffHeap = indexLoadingConfig.isDirectRealtimeOffHeapAllocation(); + if (offHeap && !directOffHeap) { + _memoryManager = new MmapMemoryManager(null, _segmentName, null); + } else { + _memoryManager = new DirectMemoryManager(_segmentName, null); + } + + // Initialize for each column + for (FieldSpec fieldSpec : physicalFieldSpecs1) { + String column = fieldSpec.getName(); + + // Partition info + PartitionFunction partitionFunction = null; + Set<Integer> partitions = null; + if (column.equals(_partitionColumn)) { + partitionFunction = _partitionFunction; + partitions = new HashSet<>(); + partitions.add(segmentGeneratorConfig.getSequenceId()); + } + + FieldSpec.DataType dataType = fieldSpec.getDataType(); + boolean isFixedWidthColumn = dataType.isFixedWidth(); + MutableForwardIndex forwardIndex; + MutableDictionary dictionary; + + int dictionaryColumnSize; + if (isFixedWidthColumn) { + dictionaryColumnSize = dataType.size(); + } else { + dictionaryColumnSize = DEFAULT_EST_AVG_COL_SIZE; + } + // NOTE: preserve 10% buffer for cardinality to reduce the chance of re-sizing the dictionary + int estimatedCardinality = (int) (DEFAULT_EST_CARDINALITY * 1.1); + String dictionaryAllocationContext = + buildAllocationContext(_segmentName, column, V1Constants.Dict.FILE_EXTENSION); + dictionary = MutableDictionaryFactory + .getMutableDictionary(dataType, offHeap, _memoryManager, dictionaryColumnSize, + Math.min(estimatedCardinality, _capacity), dictionaryAllocationContext); + + if (fieldSpec.isSingleValueField()) { + // Single-value dictionary-encoded forward index + String allocationContext = + buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION); + forwardIndex = new FixedByteSVMutableForwardIndex(true, FieldSpec.DataType.INT, _capacity, _memoryManager, + allocationContext); + } else { + // Multi-value dictionary-encoded forward index + String allocationContext = + buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION); + // TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand + forwardIndex = new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW, + indexLoadingConfig.getRealtimeAvgMultiValueCount(), _capacity, Integer.BYTES, _memoryManager, + allocationContext); + } + + _indexContainerMap.put(column, + new IntermediateIndexContainer(fieldSpec, partitionFunction, partitions, new NumValuesInfo(), forwardIndex, + dictionary)); + } + } + + @Override + public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) + throws IOException { + updateDictionary(row); + addNewRow(row); + _numDocsIndexed++; + return true; + } + + @Override + public int getNumDocsIndexed() { + return _numDocsIndexed; + } + + @Override + public String getSegmentName() { + return _segmentName; + } + + @Override + public SegmentMetadata getSegmentMetadata() { + return null; + } + + @Override + public Set<String> getColumnNames() { + return _schema.getColumnNames(); + } + + @Override + public Set<String> getPhysicalColumnNames() { + return _schema.getPhysicalColumnNames(); + } + + @Override + public DataSource getDataSource(String columnName) { + return _indexContainerMap.get(columnName).toDataSource(_numDocsIndexed); + } + + @Override + public List<StarTreeV2> getStarTrees() { + return null; + } + + @Nullable + @Override + public ValidDocIndexReader getValidDocIndex() { + return null; + } + + @Override + public GenericRow getRecord(int docId, GenericRow reuse) { + for (Map.Entry<String, IntermediateIndexContainer> entry : _indexContainerMap.entrySet()) { + String column = entry.getKey(); + IntermediateIndexContainer indexContainer = entry.getValue(); + Object value = getValue(docId, indexContainer.getForwardIndex(), indexContainer.getDictionary(), + indexContainer.getNumValuesInfo().getMaxNumValuesPerMVEntry()); + reuse.putValue(column, value); + } + return reuse; + } + + @Override + public void destroy() { + String segmentName = getSegmentName(); + LOGGER.info("Trying to destroy segment : {}", segmentName); + for (Map.Entry<String, IntermediateIndexContainer> entry : _indexContainerMap.entrySet()) { + try { + entry.getValue().close(); + } catch (IOException e) { + LOGGER.error("Failed to close indexes for column: {}. Continuing with error.", entry.getKey(), e); + } + } + } + + private void updateDictionary(GenericRow row) { + for (Map.Entry<String, IntermediateIndexContainer> entry : _indexContainerMap.entrySet()) { + String column = entry.getKey(); + IntermediateIndexContainer indexContainer = entry.getValue(); + Object value = row.getValue(column); + MutableDictionary dictionary = indexContainer.getDictionary(); + if (dictionary != null) { + if (indexContainer.getFieldSpec().isSingleValueField()) { + indexContainer.setDictId(dictionary.index(value)); + } else { + indexContainer.setDictIds(dictionary.index((Object[]) value)); + } + + // Update min/max value from dictionary + indexContainer.setMinValue(dictionary.getMinVal()); + indexContainer.setMaxValue(dictionary.getMaxVal()); + } + } + } + + private void addNewRow(GenericRow row) + throws IOException { + int docId = _numDocsIndexed; + for (Map.Entry<String, IntermediateIndexContainer> entry : _indexContainerMap.entrySet()) { + String column = entry.getKey(); + IntermediateIndexContainer indexContainer = entry.getValue(); + Object value = row.getValue(column); + FieldSpec fieldSpec = indexContainer.getFieldSpec(); + if (fieldSpec.isSingleValueField()) { + // Update numValues info + indexContainer.getNumValuesInfo().updateSVEntry(); + + // Update indexes + MutableForwardIndex forwardIndex = indexContainer.getForwardIndex(); + int dictId = indexContainer.getDictId(); + if (dictId >= 0) { + // Dictionary-encoded single-value column + + // Update forward index + forwardIndex.setDictId(docId, dictId); + } else { + // Single-value column with raw index + + // Update forward index + FieldSpec.DataType dataType = fieldSpec.getDataType(); + switch (dataType) { + case INT: + forwardIndex.setInt(docId, (Integer) value); + break; + case LONG: + forwardIndex.setLong(docId, (Long) value); + break; + case FLOAT: + forwardIndex.setFloat(docId, (Float) value); + break; + case DOUBLE: + forwardIndex.setDouble(docId, (Double) value); + break; + case STRING: + forwardIndex.setString(docId, (String) value); + break; + case BYTES: + forwardIndex.setBytes(docId, (byte[]) value); + break; + default: + throw new UnsupportedOperationException( + "Unsupported data type: " + dataType + " for no-dictionary column: " + column); + } + + // Update min/max value from raw value + // NOTE: Skip updating min/max value for aggregated metrics because the value will change over time. + if (fieldSpec.getFieldType() != FieldSpec.FieldType.METRIC) { + Comparable comparable; + if (dataType == FieldSpec.DataType.BYTES) { + comparable = new ByteArray((byte[]) value); + } else { + comparable = (Comparable) value; + } + if (indexContainer.getMinValue() == null) { + indexContainer.setMinValue(comparable); + indexContainer.setMaxValue(comparable); + } else { + if (comparable.compareTo(indexContainer.getMinValue()) < 0) { + indexContainer.setMinValue(comparable); + } + if (comparable.compareTo(indexContainer.getMaxValue()) > 0) { + indexContainer.setMaxValue(comparable); + } + } + } + } + } else { + // Multi-value column (always dictionary-encoded) + int[] dictIds = indexContainer.getDictIds(); + + // Update numValues info + indexContainer.getNumValuesInfo().updateMVEntry(dictIds.length); + + // Update forward index + indexContainer.getForwardIndex().setDictIdMV(docId, dictIds); + } + } + } + + private String buildAllocationContext(String segmentName, String columnName, String indexType) { + return segmentName + ":" + columnName + indexType; + } + + /** + * Helper method to read the value for the given document id. + */ + private static Object getValue(int docId, MutableForwardIndex forwardIndex, MutableDictionary dictionary, + int maxNumMultiValues) { + // Dictionary based + if (forwardIndex.isSingleValue()) { + int dictId = forwardIndex.getDictId(docId); + return dictionary.get(dictId); + } else { + int[] dictIds = new int[maxNumMultiValues]; + int numValues = forwardIndex.getDictIdMV(docId, dictIds); + Object[] value = new Object[numValues]; + for (int i = 0; i < numValues; i++) { + value[i] = dictionary.get(dictIds[i]); + } + return value; + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java index 2687ad8..e3612f1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java @@ -79,7 +79,9 @@ public abstract class RealtimeIndexOffHeapMemoryManager implements PinotDataBuff PinotDataBuffer buffer = allocateInternal(size, allocationContext); _totalAllocatedBytes += size; _buffers.add(buffer); - _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size); + if (_serverMetrics != null) { + _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size); + } return buffer; } @@ -104,7 +106,9 @@ public abstract class RealtimeIndexOffHeapMemoryManager implements PinotDataBuff for (PinotDataBuffer buffer : _buffers) { buffer.close(); } - _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, -_totalAllocatedBytes); + if (_serverMetrics != null) { + _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, -_totalAllocatedBytes); + } doClose(); _buffers.clear(); _totalAllocatedBytes = 0; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java similarity index 97% rename from pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java rename to pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java index 8c4c93a..8b76bda 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java @@ -33,7 +33,7 @@ import org.apache.pinot.spi.data.FieldSpec; * * TODO: Gather more info on the fly to avoid scanning the segment */ -public class RealtimeColumnStatistics implements ColumnStatistics { +public class MutableColumnStatistics implements ColumnStatistics { private final DataSource _dataSource; private final int[] _sortedDocIdIterationOrder; @@ -41,7 +41,7 @@ public class RealtimeColumnStatistics implements ColumnStatistics { // dictionary. private final Dictionary _dictionary; - public RealtimeColumnStatistics(DataSource dataSource, int[] sortedDocIdIterationOrder) { + public MutableColumnStatistics(DataSource dataSource, int[] sortedDocIdIterationOrder) { _dataSource = dataSource; _sortedDocIdIterationOrder = sortedDocIdIterationOrder; _dictionary = dataSource.getDictionary(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java similarity index 95% rename from pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java rename to pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java index d639b17..109884d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java @@ -28,11 +28,11 @@ import org.apache.pinot.core.segment.creator.ColumnStatistics; import static org.apache.pinot.core.common.Constants.UNKNOWN_CARDINALITY; -public class RealtimeNoDictionaryColStatistics implements ColumnStatistics { +public class MutableNoDictionaryColStatistics implements ColumnStatistics { private final DataSourceMetadata _dataSourceMetadata; private final MutableForwardIndex _forwardIndex; - public RealtimeNoDictionaryColStatistics(DataSource dataSource) { + public MutableNoDictionaryColStatistics(DataSource dataSource) { _dataSourceMetadata = dataSource.getDataSourceMetadata(); _forwardIndex = (MutableForwardIndex) dataSource.getForwardIndex(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java index bedf50c..d9b382d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java @@ -42,10 +42,10 @@ public class RealtimeSegmentStatsContainer implements SegmentPreIndexStatsContai for (String columnName : realtimeSegment.getPhysicalColumnNames()) { DataSource dataSource = realtimeSegment.getDataSource(columnName); if (dataSource.getDictionary() != null) { - _columnStatisticsMap.put(columnName, new RealtimeColumnStatistics(realtimeSegment.getDataSource(columnName), + _columnStatisticsMap.put(columnName, new MutableColumnStatistics(realtimeSegment.getDataSource(columnName), realtimeSegmentRecordReader.getSortedDocIdIterationOrder())); } else { - _columnStatisticsMap.put(columnName, new RealtimeNoDictionaryColStatistics(dataSource)); + _columnStatisticsMap.put(columnName, new MutableNoDictionaryColStatistics(dataSource)); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java new file mode 100644 index 0000000..7a27ed5 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.creator; + +import org.apache.pinot.common.Utils; +import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader; +import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IntermediateSegmentSegmentCreationDataSource implements SegmentCreationDataSource { + private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegmentSegmentCreationDataSource.class); + private final IntermediateSegment _intermediateSegment; + private final IntermediateSegmentRecordReader _intermediateSegmentRecordReader; + + + public IntermediateSegmentSegmentCreationDataSource(IntermediateSegmentRecordReader intermediateSegmentRecordReader) { + _intermediateSegmentRecordReader = intermediateSegmentRecordReader; + _intermediateSegment = _intermediateSegmentRecordReader.getIntermediateSegment(); + } + + @Override + public SegmentPreIndexStatsContainer gatherStats(StatsCollectorConfig statsCollectorConfig) { + + return new IntermediateSegmentStatsContainer(_intermediateSegment); + } + + @Override + public RecordReader getRecordReader() { + try { + _intermediateSegmentRecordReader.rewind(); + } catch (Exception e) { + LOGGER.error("Caught exception while rewinding record reader", e); + Utils.rethrowException(e); + } + return _intermediateSegmentRecordReader; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentStatsContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentStatsContainer.java new file mode 100644 index 0000000..3e29d52 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentStatsContainer.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.creator; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment; +import org.apache.pinot.core.realtime.converter.stats.MutableColumnStatistics; + + +public class IntermediateSegmentStatsContainer implements SegmentPreIndexStatsContainer { + private final IntermediateSegment _intermediateSegment; + private final Map<String, ColumnStatistics> _columnStatisticsMap = new HashMap<>(); + + public IntermediateSegmentStatsContainer(IntermediateSegment intermediateSegment) { + _intermediateSegment = intermediateSegment; + + // Create all column statistics + for (String columnName : intermediateSegment.getPhysicalColumnNames()) { + DataSource dataSource = intermediateSegment.getDataSource(columnName); + // Always use dictionary for intermediate segment stats + _columnStatisticsMap.put(columnName, new MutableColumnStatistics(dataSource, null)); + } + } + + @Override + public ColumnStatistics getColumnProfileFor(String column) { + return _columnStatisticsMap.get(column); + } + + @Override + public int getTotalDocCount() { + return _intermediateSegment.getNumDocsIndexed(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java index d2b4e52..3c79af4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -602,7 +602,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { properties.setProperty(getKeyFor(column, HAS_FST_INDEX), String.valueOf(hasFSTIndex)); properties.setProperty(getKeyFor(column, HAS_JSON_INDEX), String.valueOf(hasJsonIndex)); properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), String.valueOf(fieldSpec.isSingleValueField())); - properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMTS), + properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS), String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements())); properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES), String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries())); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java index 3078dc7..1efabfc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.UUID; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader; import org.apache.pinot.core.data.readers.PinotSegmentRecordReader; import org.apache.pinot.core.data.recordtransformer.CompositeTransformer; import org.apache.pinot.core.data.recordtransformer.RecordTransformer; @@ -39,6 +40,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; import org.apache.pinot.core.segment.creator.ColumnIndexCreationInfo; import org.apache.pinot.core.segment.creator.ColumnStatistics; +import org.apache.pinot.core.segment.creator.IntermediateSegmentSegmentCreationDataSource; import org.apache.pinot.core.segment.creator.RecordReaderSegmentCreationDataSource; import org.apache.pinot.core.segment.creator.SegmentCreationDataSource; import org.apache.pinot.core.segment.creator.SegmentCreator; @@ -136,8 +138,13 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive public void init(SegmentGeneratorConfig config, RecordReader recordReader) throws Exception { - init(config, new RecordReaderSegmentCreationDataSource(recordReader), - CompositeTransformer.getDefaultTransformer(config.getTableConfig(), config.getSchema())); + SegmentCreationDataSource dataSource; + if (config.isIntermediateSegmentRecordReader()) { + dataSource = new IntermediateSegmentSegmentCreationDataSource((IntermediateSegmentRecordReader) recordReader); + } else { + dataSource = new RecordReaderSegmentCreationDataSource(recordReader); + } + init(config, dataSource, CompositeTransformer.getDefaultTransformer(config.getTableConfig(), config.getSchema())); } public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java index b22ceac..82d4ae1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java @@ -82,7 +82,7 @@ public class V1Constants { public static final String HAS_FST_INDEX = "hasFSTIndex"; public static final String HAS_JSON_INDEX = "hasJsonIndex"; public static final String IS_SINGLE_VALUED = "isSingleValues"; - public static final String MAX_MULTI_VALUE_ELEMTS = "maxNumberOfMultiValues"; + public static final String MAX_MULTI_VALUE_ELEMENTS = "maxNumberOfMultiValues"; public static final String TOTAL_NUMBER_OF_ENTRIES = "totalNumberOfEntries"; public static final String IS_AUTO_GENERATED = "isAutoGenerated"; public static final String DEFAULT_NULL_VALUE = "defaultNullValue"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/IntermediateIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/IntermediateIndexContainer.java new file mode 100644 index 0000000..7c8263f --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/IntermediateIndexContainer.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.column; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.data.partition.PartitionFunction; +import org.apache.pinot.core.segment.index.datasource.MutableDataSource; +import org.apache.pinot.core.segment.index.readers.MutableDictionary; +import org.apache.pinot.core.segment.index.readers.MutableForwardIndex; +import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IntermediateIndexContainer implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateIndexContainer.class); + + final FieldSpec _fieldSpec; + final PartitionFunction _partitionFunction; + final Set<Integer> _partitions; + final NumValuesInfo _numValuesInfo; + final MutableForwardIndex _forwardIndex; + final MutableDictionary _dictionary; + + volatile Comparable _minValue; + volatile Comparable _maxValue; + + // Hold the dictionary id for the latest record + int _dictId = Integer.MIN_VALUE; + int[] _dictIds; + + public IntermediateIndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction partitionFunction, + @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex, + MutableDictionary dictionary) { + _fieldSpec = fieldSpec; + _partitionFunction = partitionFunction; + _partitions = partitions; + _numValuesInfo = numValuesInfo; + _forwardIndex = forwardIndex; + _dictionary = dictionary; + } + + public DataSource toDataSource(int numDocsIndexed) { + return new MutableDataSource(_fieldSpec, numDocsIndexed, _numValuesInfo._numValues, + _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, _partitions, _minValue, _maxValue, _forwardIndex, + _dictionary, null, null, null, false, null, null, null); + } + + @Override + public void close() + throws IOException { + String column = _fieldSpec.getName(); + try { + _forwardIndex.close(); + } catch (Exception e) { + LOGGER.error("Caught exception while closing forward index for column: {}, continuing with error", column, e); + } + if (_dictionary != null) { + try { + _dictionary.close(); + } catch (Exception e) { + LOGGER.error("Caught exception while closing dictionary for column: {}, continuing with error", column, e); + } + } + } + + public FieldSpec getFieldSpec() { + return _fieldSpec; + } + + public NumValuesInfo getNumValuesInfo() { + return _numValuesInfo; + } + + public MutableForwardIndex getForwardIndex() { + return _forwardIndex; + } + + public MutableDictionary getDictionary() { + return _dictionary; + } + + public int getDictId() { + return _dictId; + } + + public void setDictId(int dictId) { + _dictId = dictId; + } + + public int[] getDictIds() { + return _dictIds; + } + + public void setDictIds(int[] dictIds) { + _dictIds = dictIds; + } + + public Comparable getMinValue() { + return _minValue; + } + + public void setMinValue(Comparable minValue) { + _minValue = minValue; + } + + public Comparable getMaxValue() { + return _maxValue; + } + + public void setMaxValue(Comparable maxValue) { + _maxValue = maxValue; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/NumValuesInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/NumValuesInfo.java new file mode 100644 index 0000000..6992c8f --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/NumValuesInfo.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.column; + +public class NumValuesInfo { + volatile int _numValues = 0; + volatile int _maxNumValuesPerMVEntry = 0; + + public void updateSVEntry() { + _numValues++; + } + + public void updateMVEntry(int numValuesInMVEntry) { + _numValues += numValuesInMVEntry; + _maxNumValuesPerMVEntry = Math.max(_maxNumValuesPerMVEntry, numValuesInMVEntry); + } + + public int getNumValues() { + return _numValues; + } + + public int getMaxNumValuesPerMVEntry() { + return _maxNumValuesPerMVEntry; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java index e307aaf..6056f6a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java @@ -194,6 +194,9 @@ public class IndexLoadingConfig { } private void extractFromInstanceConfig(InstanceDataManagerConfig instanceDataManagerConfig) { + if (instanceDataManagerConfig == null) { + return; + } ReadMode instanceReadMode = instanceDataManagerConfig.getReadMode(); if (instanceReadMode != null) { _readMode = instanceReadMode; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java index 43990ab..b67b0fc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java @@ -100,7 +100,7 @@ public class ColumnMetadata { builder.setHasInvertedIndex(config.getBoolean(getKeyFor(column, HAS_INVERTED_INDEX))); builder.setHasFSTIndex(config.getBoolean(getKeyFor(column, HAS_INVERTED_INDEX), false)); builder.setSingleValue(config.getBoolean(getKeyFor(column, IS_SINGLE_VALUED))); - builder.setMaxNumberOfMultiValues(config.getInt(getKeyFor(column, MAX_MULTI_VALUE_ELEMTS))); + builder.setMaxNumberOfMultiValues(config.getInt(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS))); builder.setTotalNumberOfEntries(config.getInt(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES))); builder.setAutoGenerated(config.getBoolean(getKeyFor(column, IS_AUTO_GENERATED), false)); builder.setDefaultNullValueString(config.getString(getKeyFor(column, DEFAULT_NULL_VALUE), null)); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/IntermediateSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/IntermediateSegmentTest.java new file mode 100644 index 0000000..1220beb --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/IntermediateSegmentTest.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.indexsegment; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.index.readers.Dictionary; +import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; +import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader; +import org.apache.pinot.segments.v1.creator.SegmentTestUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.TimeGranularitySpec; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + + +public class IntermediateSegmentTest { + private static final String AVRO_DATA_SV = "data/test_data-sv.avro"; // "data/test_data-sv.avro"; + private static final String AVRO_DATA_MV = "data/test_data-mv.avro"; // "data/test_data-sv.avro"; + private static final String SEGMENT_NAME = "testSegmentName"; + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "IntermediateSegmentTest"); + + @BeforeMethod + public void setUp() + throws Exception { + FileUtils.deleteQuietly(INDEX_DIR); + } + + @AfterMethod + public void tearDown() { + FileUtils.deleteQuietly(INDEX_DIR); + } + + @DataProvider(name = "segmentCreationTestCases") + private static Object[][] createSegmentCreationTestCases() + throws IOException { + return new Object[][]{{AVRO_DATA_SV, createSchema(AVRO_DATA_SV), createTableConfig( + AVRO_DATA_SV)}, {AVRO_DATA_MV, createSchema(AVRO_DATA_MV), createTableConfig(AVRO_DATA_MV)}}; + } + + @Test(dataProvider = "segmentCreationTestCases") + public void testOfflineSegmentCreationFromDifferentWays(String inputFile, Schema schema, TableConfig tableConfig) + throws Exception { + // Get resource file path. + URL resource = getClass().getClassLoader().getResource(inputFile); + assertNotNull(resource); + String filePath = resource.getFile(); + + // Create the segment generator config. + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); + segmentGeneratorConfig.setInputFilePath(filePath); + segmentGeneratorConfig.setTableName("testTable"); + segmentGeneratorConfig.setSegmentName(SEGMENT_NAME); + segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath()); + segmentGeneratorConfig.setSkipTimeValueCheck(true); + segmentGeneratorConfig.setInvertedIndexCreationColumns(Arrays.asList("column6", "column7")); + + IndexSegment segmentFromIntermediateSegment = buildSegmentFromIntermediateSegment(segmentGeneratorConfig); + IndexSegment segmentFromAvroRecordReader = buildSegmentFromAvroRecordReader(segmentGeneratorConfig); + + assertNotNull(segmentFromIntermediateSegment); + assertNotNull(segmentFromAvroRecordReader); + assertEquals(segmentFromIntermediateSegment.getColumnNames(), segmentFromAvroRecordReader.getColumnNames()); + Set<String> physicalColumnsFromIntermediateSegment = segmentFromIntermediateSegment.getPhysicalColumnNames(); + Set<String> physicalColumnsFromAvroSegment = segmentFromAvroRecordReader.getPhysicalColumnNames(); + assertEquals(physicalColumnsFromIntermediateSegment, physicalColumnsFromAvroSegment); + + // Comparison for every columns + for (String column : physicalColumnsFromIntermediateSegment) { + DataSource dataSourceFromIntermediateSegment = segmentFromIntermediateSegment.getDataSource(column); + DataSource dataSourceFromAvroRecordReader = segmentFromAvroRecordReader.getDataSource(column); + + // Comparison for dictionaries. + Dictionary actualDictionary = dataSourceFromIntermediateSegment.getDictionary(); + Dictionary expectedDictionary = dataSourceFromAvroRecordReader.getDictionary(); + assertEquals(actualDictionary.getMinVal(), expectedDictionary.getMinVal()); + assertEquals(actualDictionary.getMaxVal(), expectedDictionary.getMaxVal()); + assertEquals(actualDictionary.getValueType(), expectedDictionary.getValueType()); + assertEquals(actualDictionary.length(), expectedDictionary.length()); + int dictionaryLength = actualDictionary.length(); + for (int i = 0; i < dictionaryLength; i++) { + assertEquals(actualDictionary.get(i), expectedDictionary.get(i)); + } + + // Comparison for inverted index + InvertedIndexReader actualInvertedIndexReader = dataSourceFromIntermediateSegment.getInvertedIndex(); + InvertedIndexReader expectedInvertedIndexReader = dataSourceFromAvroRecordReader.getInvertedIndex(); + if (actualInvertedIndexReader != null) { + for (int j = 0; j < dictionaryLength; j++) { + assertEquals(actualInvertedIndexReader.getDocIds(j), expectedInvertedIndexReader.getDocIds(j)); + } + } + } + } + + private IndexSegment buildSegmentFromIntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) + throws Exception { + // Set intermediate segment record reader. + segmentGeneratorConfig.setIntermediateSegmentRecordReader(true); + String segmentName = SEGMENT_NAME + "_from_intermediate_segment"; + segmentGeneratorConfig.setSegmentName(segmentName); + + IntermediateSegment intermediateSegment = new IntermediateSegment(segmentGeneratorConfig); + + // Ingest data. + ingestDataToIntermediateSegment(segmentGeneratorConfig, intermediateSegment); + IntermediateSegmentRecordReader intermediateSegmentRecordReader = + new IntermediateSegmentRecordReader(intermediateSegment); + + // Build the segment from intermediate segment. + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig, intermediateSegmentRecordReader); + driver.build(); + + // Destroy intermediate segment after the segment creation. + intermediateSegment.destroy(); + + return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.heap); + } + + private IndexSegment buildSegmentFromAvroRecordReader(SegmentGeneratorConfig segmentGeneratorConfig) + throws Exception { + // Reset default value to use avro record reader + segmentGeneratorConfig.setIntermediateSegmentRecordReader(false); + String segmentName = SEGMENT_NAME + "_from_avro_reader"; + segmentGeneratorConfig.setSegmentName(segmentName); + + // Build the index segment. + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig); + driver.build(); + + return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.heap); + } + + private void ingestDataToIntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig, + IntermediateSegment intermediateSegment) + throws IOException { + AvroRecordReader avroRecordReader = new AvroRecordReader(); + avroRecordReader.init(new File(segmentGeneratorConfig.getInputFilePath()), null, null); + + GenericRow genericRow = new GenericRow(); + while (avroRecordReader.hasNext()) { + genericRow.clear(); + genericRow = avroRecordReader.next(genericRow); + intermediateSegment.index(genericRow, null); + } + } + + private static Schema createSchema(String inputFile) + throws IOException { + Schema schema; + if (AVRO_DATA_SV.equals(inputFile)) { + schema = new Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", FieldSpec.DataType.INT) + .addMetric("column3", FieldSpec.DataType.INT).addSingleValueDimension("column5", FieldSpec.DataType.STRING) + .addSingleValueDimension("column6", FieldSpec.DataType.INT) + .addSingleValueDimension("column7", FieldSpec.DataType.INT) + .addSingleValueDimension("column9", FieldSpec.DataType.INT) + .addSingleValueDimension("column11", FieldSpec.DataType.STRING) + .addSingleValueDimension("column12", FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT) + .addMetric("column18", FieldSpec.DataType.INT) + .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build(); + } else { + URL resource = IntermediateSegmentTest.class.getClassLoader().getResource(inputFile); + assertNotNull(resource); + String filePath = resource.getFile(); + schema = SegmentTestUtils.extractSchemaFromAvroWithoutTime(new File(filePath)); + } + return schema; + } + + private static TableConfig createTableConfig(String inputFile) { + TableConfig tableConfig; + if (AVRO_DATA_SV.equals(inputFile)) { + tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch") + .setInvertedIndexColumns(Arrays.asList("column6", "column7", "column11", "column17", "column18")).build(); + } else { + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); + } + return tableConfig; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org