This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fe8d39506b bugfix: use consumerDir during lucene realtime segment conversion (#13094) fe8d39506b is described below commit fe8d39506bdb15f67a4f2d3977ced418d3b8b8b7 Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com> AuthorDate: Fri May 10 22:42:07 2024 -0700 bugfix: use consumerDir during lucene realtime segment conversion (#13094) --- .../realtime/provisioning/MemoryEstimator.java | 6 ++++-- .../realtime/RealtimeSegmentDataManagerTest.java | 1 + .../indexsegment/mutable/MutableSegmentImpl.java | 9 +++++++- .../RealtimeSegmentSegmentCreationDataSource.java | 8 +++++++ .../local/realtime/impl/RealtimeSegmentConfig.java | 2 -- .../invertedindex/RealtimeLuceneTextIndex.java | 9 ++++---- .../creator/impl/SegmentColumnarIndexCreator.java | 1 + .../impl/SegmentIndexCreationDriverImpl.java | 1 + .../creator/impl/text/LuceneTextIndexCreator.java | 25 +++++++++++++--------- .../index/readers/text/LuceneTextIndexReader.java | 1 + .../local/segment/index/text/TextIndexType.java | 3 --- .../mutable/MutableSegmentImplTestUtils.java | 7 +++++- .../converter/RealtimeSegmentConverterTest.java | 2 +- .../invertedindex/LuceneMutableTextIndexTest.java | 2 +- .../segment/store/FilePerIndexDirectoryTest.java | 12 +++++++---- .../store/SingleFileIndexDirectoryTest.java | 12 +++++++---- .../apache/pinot/segment/spi/MutableSegment.java | 6 ++++++ .../segment/spi/creator/IndexCreationContext.java | 22 +++++++++++++++++-- .../spi/creator/SegmentGeneratorConfig.java | 10 +++++++++ .../mutable/provider/MutableIndexContext.java | 3 --- 20 files changed, 104 insertions(+), 38 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java index 934b197560..a4b3db45a9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java @@ -163,7 +163,8 @@ public class MemoryEstimator { .setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType) .setSchema(_segmentMetadata.getSchema()).setCapacity(_segmentMetadata.getTotalDocs()) .setAvgNumMultiValues(_avgMultiValues).setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true) - .setMemoryManager(memoryManager).setStatsHistory(sampleStatsHistory); + .setMemoryManager(memoryManager).setStatsHistory(sampleStatsHistory) + .setConsumerDir(_workingDir.getAbsolutePath()); // create mutable segment impl MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null); @@ -326,7 +327,8 @@ public class MemoryEstimator { .setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType) .setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues) .setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true) - .setMemoryManager(memoryManager).setStatsHistory(statsHistory); + .setMemoryManager(memoryManager).setStatsHistory(statsHistory) + .setConsumerDir(_workingDir.getAbsolutePath()); // create mutable segment impl MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index cc93fcfe77..088e270a32 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -106,6 +106,7 @@ public class RealtimeSegmentDataManagerTest { when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200); when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32); when(tableDataManager.getStatsHistory()).thenReturn(statsHistory); + when(tableDataManager.getConsumerDir()).thenReturn(TEMP_DIR.getAbsolutePath() + "/consumerDir"); return tableDataManager; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index b336b30f20..38bd1921c6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -140,6 +140,7 @@ public class MutableSegmentImpl implements MutableSegment { private final PartitionFunction _partitionFunction; private final int _mainPartitionId; // partition id designated for this consuming segment private final boolean _nullHandlingEnabled; + private final File _consumerDir; private final Map<String, IndexContainer> _indexContainerMap = new HashMap<>(); @@ -216,6 +217,7 @@ public class MutableSegmentImpl implements MutableSegment { _partitionFunction = config.getPartitionFunction(); _mainPartitionId = config.getPartitionId(); _nullHandlingEnabled = config.isNullHandlingEnabled(); + _consumerDir = new File(config.getConsumerDir()); Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs(); List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size()); @@ -283,7 +285,7 @@ public class MutableSegmentImpl implements MutableSegment { .withEstimatedCardinality(_statsHistory.getEstimatedCardinality(column)) .withEstimatedColSize(_statsHistory.getEstimatedAvgColSize(column)) .withAvgNumMultiValues(_statsHistory.getEstimatedAvgColSize(column)) - .withConsumerDir(config.getConsumerDir() != null ? new File(config.getConsumerDir()) : null) + .withConsumerDir(_consumerDir) .withFixedLengthBytes(fixedByteSize).build(); // Partition info @@ -852,6 +854,11 @@ public class MutableSegmentImpl implements MutableSegment { return _numDocsIndexed; } + @Override + public File getConsumerDir() { + return _consumerDir; + } + @Override public String getSegmentName() { return _segmentName; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java index e795dd05a6..a8c521ddbc 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.realtime.converter.stats; +import java.io.File; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.creator.SegmentCreationDataSource; @@ -48,4 +49,11 @@ public class RealtimeSegmentSegmentCreationDataSource implements SegmentCreation public RecordReader getRecordReader() { return _recordReader; } + + /** + * Returns the consumer directory of the realtime segment + */ + public File getConsumerDir() { + return _mutableSegment.getConsumerDir(); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java index a95a68893c..956135f7ca 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; @@ -179,7 +178,6 @@ public class RealtimeSegmentConfig { return _nullHandlingEnabled; } - @Nullable public String getConsumerDir() { return _consumerDir; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java index 8d2e43c8a5..751bff517a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java @@ -78,7 +78,7 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { // for realtime _indexCreator = new LuceneTextIndexCreator(column, new File(segmentIndexDir.getAbsolutePath() + "/" + segmentName), - false /* commitOnClose */, true, null, config); + false /* commitOnClose */, false, null, null, config); IndexWriter indexWriter = _indexCreator.getIndexWriter(); _searcherManager = new SearcherManager(indexWriter, false, false, null); _analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer(); @@ -151,9 +151,9 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { return searchFuture.get(); } catch (InterruptedException e) { docIDCollector.markShouldCancel(); - LOGGER.warn("TEXT_MATCH query timeout on realtime consuming segment {}, column {}, search query {}", _segmentName, - _column, searchQuery); - throw new RuntimeException("TEXT_MATCH query timeout on realtime consuming segment"); + LOGGER.warn("TEXT_MATCH query interrupted while querying the consuming segment {}, column {}, search query {}", + _segmentName, _column, searchQuery); + throw new RuntimeException("TEXT_MATCH query interrupted while querying the consuming segment"); } catch (Exception e) { LOGGER.error("Failed while searching the realtime text index for segment {}, column {}, search query {}," + " exception {}", _segmentName, _column, searchQuery, e.getMessage()); @@ -198,6 +198,7 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { _searcherManager.close(); _searcherManager = null; _indexCreator.close(); + _analyzer.close(); } catch (Exception e) { LOGGER.error("Failed while closing the realtime text index for column {}, exception {}", _column, e.getMessage()); throw new RuntimeException(e); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java index 1483191b7f..8406226d85 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -163,6 +163,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { .withTextCommitOnClose(true) .withImmutableToMutableIdMap(immutableToMutableIdMap) .withRealtimeConversion(segmentCreationSpec.isRealtimeConversion()) + .withConsumerDir(segmentCreationSpec.getConsumerDir()) .build(); //@formatter:on diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index ecfea58ca7..f75afb5b15 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -195,6 +195,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive // Optimization for realtime segment conversion if (dataSource instanceof RealtimeSegmentSegmentCreationDataSource) { _config.setRealtimeConversion(true); + _config.setConsumerDir(((RealtimeSegmentSegmentCreationDataSource) dataSource).getConsumerDir()); } // Initialize stats collection diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java index c24778ab37..1e9581980d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java @@ -89,6 +89,8 @@ public class LuceneTextIndexCreator extends AbstractTextIndexCreator { * @param segmentIndexDir segment index directory * @param commit true if the index should be committed (at the end after all documents have * been added), false if index should not be committed + * @param realtimeConversion index creator should create an index using the realtime segment + * @param consumerDir consumer dir containing the realtime index, used when realtimeConversion and commit is true * @param immutableToMutableIdMap immutableToMutableIdMap from segment conversion * Note on commit: * Once {@link SegmentColumnarIndexCreator} @@ -106,7 +108,7 @@ public class LuceneTextIndexCreator extends AbstractTextIndexCreator { * @param config the text index config */ public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commit, boolean realtimeConversion, - @Nullable int[] immutableToMutableIdMap, TextIndexConfig config) { + @Nullable File consumerDir, @Nullable int[] immutableToMutableIdMap, TextIndexConfig config) { _textColumn = column; _commitOnClose = commit; @@ -144,7 +146,7 @@ public class LuceneTextIndexCreator extends AbstractTextIndexCreator { if (_reuseMutableIndex) { LOGGER.info("Reusing the realtime lucene index for segment {} and column {}", segmentIndexDir, column); indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); - convertMutableSegment(segmentIndexDir, immutableToMutableIdMap, indexWriterConfig); + convertMutableSegment(segmentIndexDir, consumerDir, immutableToMutableIdMap, indexWriterConfig); return; } @@ -161,7 +163,7 @@ public class LuceneTextIndexCreator extends AbstractTextIndexCreator { public LuceneTextIndexCreator(IndexCreationContext context, TextIndexConfig indexConfig) { this(context.getFieldSpec().getName(), context.getIndexDir(), context.isTextCommitOnClose(), - context.isRealtimeConversion(), context.getImmutableToMutableIdMap(), indexConfig); + context.isRealtimeConversion(), context.getConsumerDir(), context.getImmutableToMutableIdMap(), indexConfig); } public IndexWriter getIndexWriter() { @@ -174,12 +176,12 @@ public class LuceneTextIndexCreator extends AbstractTextIndexCreator { * @param immutableToMutableIdMap immutableToMutableIdMap from segment conversion * @param indexWriterConfig indexWriterConfig */ - private void convertMutableSegment(File segmentIndexDir, @Nullable int[] immutableToMutableIdMap, + private void convertMutableSegment(File segmentIndexDir, File consumerDir, @Nullable int[] immutableToMutableIdMap, IndexWriterConfig indexWriterConfig) { try { // Copy the mutable index to the v1 index location File dest = getV1TextIndexFile(segmentIndexDir); - File mutableDir = getMutableIndexDir(segmentIndexDir); + File mutableDir = getMutableIndexDir(segmentIndexDir, consumerDir); FileUtils.copyDirectory(mutableDir, dest); // Remove the copied write.lock file @@ -344,12 +346,15 @@ public class LuceneTextIndexCreator extends AbstractTextIndexCreator { return new File(indexDir, luceneIndexDirectory); } - private File getMutableIndexDir(File indexDir) { + private File getMutableIndexDir(File indexDir, File consumerDir) { + String segmentName = getSegmentName(indexDir); + return new File(new File(consumerDir, segmentName), + _textColumn + V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION); + } + + private String getSegmentName(File indexDir) { // tmpSegmentName format: tmp-tableName__9__1__20240227T0254Z-1709002522086 String tmpSegmentName = indexDir.getParentFile().getName(); - String segmentName = tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4, tmpSegmentName.lastIndexOf('-')); - String mutableDir = indexDir.getParentFile().getParentFile().getParent() + "/consumers/" + segmentName + "/" - + _textColumn + V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION; - return new File(mutableDir); + return tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4, tmpSegmentName.lastIndexOf('-')); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java index 07eb52f88b..ed027903b9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java @@ -188,6 +188,7 @@ public class LuceneTextIndexReader implements TextIndexReader { _indexReader.close(); _indexDirectory.close(); _docIdTranslator.close(); + _analyzer.close(); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java index cfbf6271f1..7a936aac76 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java @@ -194,9 +194,6 @@ public class TextIndexType extends AbstractIndexType<TextIndexConfig, TextIndexR if (config.getFstType() == FSTType.NATIVE) { return new NativeMutableTextIndex(context.getFieldSpec().getName()); } - if (context.getConsumerDir() == null) { - throw new IllegalArgumentException("A consumer directory is required"); - } return new RealtimeLuceneTextIndex(context.getFieldSpec().getName(), context.getConsumerDir(), context.getSegmentName(), config); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java index ade22fcad6..6a8ca27480 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java @@ -18,10 +18,13 @@ */ package org.apache.pinot.segment.local.indexsegment.mutable; +import java.io.File; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; +import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; @@ -49,6 +52,7 @@ public class MutableSegmentImplTestUtils { private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME"; private static final String SEGMENT_NAME = "testSegment__0__0__155555"; private static final String STREAM_NAME = "testStream"; + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "MutableSegmentImplTestUtils"); public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, @@ -118,7 +122,8 @@ public class MutableSegmentImplTestUtils { .setPartitionDedupMetadataManager(partitionDedupMetadataManager) .setIngestionAggregationConfigs(aggregationConfigs) .setUpsertDropOutOfOrderRecord(isUpsertDropOutOfOrderRecord) - .setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn); + .setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn) + .setConsumerDir(TEMP_DIR.getAbsolutePath() + "/" + UUID.randomUUID() + "/consumerDir"); for (Map.Entry<String, JsonIndexConfig> entry : jsonIndexConfigs.entrySet()) { segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(), entry.getValue()); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java index e4ed4bb396..033acc7dbf 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java @@ -484,7 +484,7 @@ public class RealtimeSegmentConverterTest { .setFieldConfigList(fieldConfigList).setSegmentZKMetadata(getSegmentZKMetadata(segmentName)) .setOffHeap(true).setMemoryManager(new DirectMemoryManager(segmentName)) .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new File(tmpDir, "stats"))) - .setConsumerDir(new File(tmpDir, "consumers").getAbsolutePath()); + .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath()); // create mutable segment impl RealtimeLuceneTextIndexSearcherPool.init(1); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java index b180fd0c4e..5d1a2440a5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java @@ -96,7 +96,7 @@ public class LuceneMutableTextIndexTest { } @Test(expectedExceptions = ExecutionException.class, - expectedExceptionsMessageRegExp = ".*TEXT_MATCH query timeout on realtime consuming segment.*") + expectedExceptionsMessageRegExp = ".*TEXT_MATCH query interrupted while querying the consuming segment.*") public void testQueryCancellationIsSuccessful() throws InterruptedException, ExecutionException { // Avoid early finalization by not using Executors.newSingleThreadExecutor (java <= 20, JDK-8145304) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java index a385a60b03..97e12bc321 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java @@ -204,8 +204,10 @@ public class FilePerIndexDirectoryTest { TextIndexConfig config = new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false); try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap); - LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, config); - LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true, false, null, config)) { + LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, null, + config); + LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true, false, null, null, + config)) { PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(), 1024); buf.putInt(0, 1); @@ -267,8 +269,10 @@ public class FilePerIndexDirectoryTest { new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false); // Write sth to buffers and flush them to index files on disk try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap); - LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, config); - LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true, false, null, config)) { + LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, null, + config); + LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true, false, null, null, + config)) { PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(), 1024); buf.putInt(0, 111); buf = fpi.newBuffer("col2", StandardIndexes.dictionary(), 1024); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java index 3a94ceec11..b962550e3b 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java @@ -237,8 +237,10 @@ public class SingleFileIndexDirectoryTest { TextIndexConfig config = new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false); try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap); - LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, config); - LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true, false, null, config)) { + LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, null, + config); + LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true, false, null, null, + config)) { PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(), 1024); buf.putInt(0, 1); @@ -343,8 +345,10 @@ public class SingleFileIndexDirectoryTest { TextIndexConfig config = new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false); try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap); - LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, config); - LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true, false, null, config)) { + LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, null, + config); + LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true, false, null, null, + config)) { PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(), 1024); buf.putInt(0, 111); buf = sfd.newBuffer("col2", StandardIndexes.dictionary(), 1024); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java index b675d866e5..7c4d0e1729 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.spi; +import java.io.File; import java.io.IOException; import javax.annotation.Nullable; import org.apache.pinot.spi.data.readers.GenericRow; @@ -42,4 +43,9 @@ public interface MutableSegment extends IndexSegment { * @return The number of records indexed */ int getNumDocsIndexed(); + + /** + * Returns the consumer dir containing any segment files. + */ + File getConsumerDir(); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java index 3ebe041e87..5d3a1a78ba 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java @@ -97,6 +97,11 @@ public interface IndexCreationContext { */ boolean isRealtimeConversion(); + /** + * Used in conjunction with isRealtimeConversion, this returns the location of the consumer directory used + */ + File getConsumerDir(); + /** * This contains immutableToMutableIdMap mapping generated in {@link SegmentIndexCreationDriver} * @@ -127,6 +132,7 @@ public interface IndexCreationContext { private boolean _fixedLength; private boolean _textCommitOnClose; private boolean _realtimeConversion = false; + private File _consumerDir; private int[] _immutableToMutableIdMap; public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo columnIndexCreationInfo) { @@ -250,6 +256,11 @@ public interface IndexCreationContext { return this; } + public Builder withConsumerDir(File consumerDir) { + _consumerDir = consumerDir; + return this; + } + public Builder withImmutableToMutableIdMap(int[] immutableToMutableIdMap) { _immutableToMutableIdMap = immutableToMutableIdMap; return this; @@ -260,7 +271,7 @@ public interface IndexCreationContext { _maxRowLengthInBytes, _onHeap, Objects.requireNonNull(_fieldSpec), _sorted, _cardinality, _totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue, _maxValue, _forwardIndexDisabled, _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength, _textCommitOnClose, _columnStatistics, - _realtimeConversion, _immutableToMutableIdMap); + _realtimeConversion, _consumerDir, _immutableToMutableIdMap); } public Builder withSortedUniqueElementsArray(Object sortedUniqueElementsArray) { @@ -295,6 +306,7 @@ public interface IndexCreationContext { private final boolean _textCommitOnClose; private final ColumnStatistics _columnStatistics; private final boolean _realtimeConversion; + private final File _consumerDir; private final int[] _immutableToMutableIdMap; public Common(File indexDir, int lengthOfLongestEntry, @@ -302,7 +314,7 @@ public interface IndexCreationContext { FieldSpec fieldSpec, boolean sorted, int cardinality, int totalNumberOfEntries, int totalDocs, boolean hasDictionary, Comparable<?> minValue, Comparable<?> maxValue, boolean forwardIndexDisabled, Object sortedUniqueElementsArray, boolean optimizeDictionary, boolean fixedLength, - boolean textCommitOnClose, ColumnStatistics columnStatistics, boolean realtimeConversion, + boolean textCommitOnClose, ColumnStatistics columnStatistics, boolean realtimeConversion, File consumerDir, int[] immutableToMutableIdMap) { _indexDir = indexDir; _lengthOfLongestEntry = lengthOfLongestEntry; @@ -324,6 +336,7 @@ public interface IndexCreationContext { _textCommitOnClose = textCommitOnClose; _columnStatistics = columnStatistics; _realtimeConversion = realtimeConversion; + _consumerDir = consumerDir; _immutableToMutableIdMap = immutableToMutableIdMap; } @@ -416,6 +429,11 @@ public interface IndexCreationContext { return _realtimeConversion; } + @Override + public File getConsumerDir() { + return _consumerDir; + } + @Override public int[] getImmutableToMutableIdMap() { return _immutableToMutableIdMap; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java index 5381bdc430..c879c1be52 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java @@ -121,6 +121,8 @@ public class SegmentGeneratorConfig implements Serializable { private boolean _optimizeDictionaryForMetrics = false; private double _noDictionarySizeRatioThreshold = IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD; private boolean _realtimeConversion = false; + // consumerDir contains data from the consuming segment, and is used during _realtimeConversion optimization + private File _consumerDir; private final Map<String, FieldIndexConfigs> _indexConfigsByColName; // constructed from FieldConfig @@ -732,6 +734,14 @@ public class SegmentGeneratorConfig implements Serializable { _realtimeConversion = realtimeConversion; } + public File getConsumerDir() { + return _consumerDir; + } + + public void setConsumerDir(File consumerDir) { + _consumerDir = consumerDir; + } + public void setNoDictionarySizeRatioThreshold(double noDictionarySizeRatioThreshold) { _noDictionarySizeRatioThreshold = noDictionarySizeRatioThreshold; } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java index ab152c0fd2..57d71aa278 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java @@ -20,7 +20,6 @@ package org.apache.pinot.segment.spi.index.mutable.provider; import java.io.File; import java.util.Objects; -import javax.annotation.Nullable; import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager; import org.apache.pinot.spi.data.FieldSpec; @@ -94,7 +93,6 @@ public class MutableIndexContext { return _avgNumMultiValues; } - @Nullable public File getConsumerDir() { return _consumerDir; } @@ -114,7 +112,6 @@ public class MutableIndexContext { private int _estimatedColSize; private int _estimatedCardinality; private int _avgNumMultiValues; - @Nullable private File _consumerDir; public Builder withMemoryManager(PinotDataBufferMemoryManager memoryManager) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org