This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe8d39506b bugfix: use consumerDir during lucene realtime segment 
conversion  (#13094)
fe8d39506b is described below

commit fe8d39506bdb15f67a4f2d3977ced418d3b8b8b7
Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com>
AuthorDate: Fri May 10 22:42:07 2024 -0700

    bugfix: use consumerDir during lucene realtime segment conversion  (#13094)
---
 .../realtime/provisioning/MemoryEstimator.java     |  6 ++++--
 .../realtime/RealtimeSegmentDataManagerTest.java   |  1 +
 .../indexsegment/mutable/MutableSegmentImpl.java   |  9 +++++++-
 .../RealtimeSegmentSegmentCreationDataSource.java  |  8 +++++++
 .../local/realtime/impl/RealtimeSegmentConfig.java |  2 --
 .../invertedindex/RealtimeLuceneTextIndex.java     |  9 ++++----
 .../creator/impl/SegmentColumnarIndexCreator.java  |  1 +
 .../impl/SegmentIndexCreationDriverImpl.java       |  1 +
 .../creator/impl/text/LuceneTextIndexCreator.java  | 25 +++++++++++++---------
 .../index/readers/text/LuceneTextIndexReader.java  |  1 +
 .../local/segment/index/text/TextIndexType.java    |  3 ---
 .../mutable/MutableSegmentImplTestUtils.java       |  7 +++++-
 .../converter/RealtimeSegmentConverterTest.java    |  2 +-
 .../invertedindex/LuceneMutableTextIndexTest.java  |  2 +-
 .../segment/store/FilePerIndexDirectoryTest.java   | 12 +++++++----
 .../store/SingleFileIndexDirectoryTest.java        | 12 +++++++----
 .../apache/pinot/segment/spi/MutableSegment.java   |  6 ++++++
 .../segment/spi/creator/IndexCreationContext.java  | 22 +++++++++++++++++--
 .../spi/creator/SegmentGeneratorConfig.java        | 10 +++++++++
 .../mutable/provider/MutableIndexContext.java      |  3 ---
 20 files changed, 104 insertions(+), 38 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
index 934b197560..a4b3db45a9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
@@ -163,7 +163,8 @@ public class MemoryEstimator {
             
.setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
             
.setSchema(_segmentMetadata.getSchema()).setCapacity(_segmentMetadata.getTotalDocs())
             
.setAvgNumMultiValues(_avgMultiValues).setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true)
-            
.setMemoryManager(memoryManager).setStatsHistory(sampleStatsHistory);
+            
.setMemoryManager(memoryManager).setStatsHistory(sampleStatsHistory)
+            .setConsumerDir(_workingDir.getAbsolutePath());
 
     // create mutable segment impl
     MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
@@ -326,7 +327,8 @@ public class MemoryEstimator {
             
.setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
             
.setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues)
             .setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true)
-            .setMemoryManager(memoryManager).setStatsHistory(statsHistory);
+            .setMemoryManager(memoryManager).setStatsHistory(statsHistory)
+            .setConsumerDir(_workingDir.getAbsolutePath());
 
     // create mutable segment impl
     MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index cc93fcfe77..088e270a32 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -106,6 +106,7 @@ public class RealtimeSegmentDataManagerTest {
     when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
     when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
     when(tableDataManager.getStatsHistory()).thenReturn(statsHistory);
+    
when(tableDataManager.getConsumerDir()).thenReturn(TEMP_DIR.getAbsolutePath() + 
"/consumerDir");
     return tableDataManager;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index b336b30f20..38bd1921c6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -140,6 +140,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private final PartitionFunction _partitionFunction;
   private final int _mainPartitionId; // partition id designated for this 
consuming segment
   private final boolean _nullHandlingEnabled;
+  private final File _consumerDir;
 
   private final Map<String, IndexContainer> _indexContainerMap = new 
HashMap<>();
 
@@ -216,6 +217,7 @@ public class MutableSegmentImpl implements MutableSegment {
     _partitionFunction = config.getPartitionFunction();
     _mainPartitionId = config.getPartitionId();
     _nullHandlingEnabled = config.isNullHandlingEnabled();
+    _consumerDir = new File(config.getConsumerDir());
 
     Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
     List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
@@ -283,7 +285,7 @@ public class MutableSegmentImpl implements MutableSegment {
               
.withEstimatedCardinality(_statsHistory.getEstimatedCardinality(column))
               
.withEstimatedColSize(_statsHistory.getEstimatedAvgColSize(column))
               
.withAvgNumMultiValues(_statsHistory.getEstimatedAvgColSize(column))
-              .withConsumerDir(config.getConsumerDir() != null ? new 
File(config.getConsumerDir()) : null)
+              .withConsumerDir(_consumerDir)
               .withFixedLengthBytes(fixedByteSize).build();
 
       // Partition info
@@ -852,6 +854,11 @@ public class MutableSegmentImpl implements MutableSegment {
     return _numDocsIndexed;
   }
 
+  @Override
+  public File getConsumerDir() {
+    return _consumerDir;
+  }
+
   @Override
   public String getSegmentName() {
     return _segmentName;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
index e795dd05a6..a8c521ddbc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.realtime.converter.stats;
 
+import java.io.File;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.creator.SegmentCreationDataSource;
@@ -48,4 +49,11 @@ public class RealtimeSegmentSegmentCreationDataSource 
implements SegmentCreation
   public RecordReader getRecordReader() {
     return _recordReader;
   }
+
+  /**
+   * Returns the consumer directory of the realtime segment
+   */
+  public File getConsumerDir() {
+    return _mutableSegment.getConsumerDir();
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index a95a68893c..956135f7ca 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -179,7 +178,6 @@ public class RealtimeSegmentConfig {
     return _nullHandlingEnabled;
   }
 
-  @Nullable
   public String getConsumerDir() {
     return _consumerDir;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index 8d2e43c8a5..751bff517a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -78,7 +78,7 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
       // for realtime
       _indexCreator =
           new LuceneTextIndexCreator(column, new 
File(segmentIndexDir.getAbsolutePath() + "/" + segmentName),
-              false /* commitOnClose */, true, null, config);
+              false /* commitOnClose */, false, null, null, config);
       IndexWriter indexWriter = _indexCreator.getIndexWriter();
       _searcherManager = new SearcherManager(indexWriter, false, false, null);
       _analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer();
@@ -151,9 +151,9 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
       return searchFuture.get();
     } catch (InterruptedException e) {
       docIDCollector.markShouldCancel();
-      LOGGER.warn("TEXT_MATCH query timeout on realtime consuming segment {}, 
column {}, search query {}", _segmentName,
-          _column, searchQuery);
-      throw new RuntimeException("TEXT_MATCH query timeout on realtime 
consuming segment");
+      LOGGER.warn("TEXT_MATCH query interrupted while querying the consuming 
segment {}, column {}, search query {}",
+          _segmentName, _column, searchQuery);
+      throw new RuntimeException("TEXT_MATCH query interrupted while querying 
the consuming segment");
     } catch (Exception e) {
       LOGGER.error("Failed while searching the realtime text index for segment 
{}, column {}, search query {},"
               + " exception {}", _segmentName, _column, searchQuery, 
e.getMessage());
@@ -198,6 +198,7 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
       _searcherManager.close();
       _searcherManager = null;
       _indexCreator.close();
+      _analyzer.close();
     } catch (Exception e) {
       LOGGER.error("Failed while closing the realtime text index for column 
{}, exception {}", _column, e.getMessage());
       throw new RuntimeException(e);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 1483191b7f..8406226d85 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -163,6 +163,7 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
           .withTextCommitOnClose(true)
           .withImmutableToMutableIdMap(immutableToMutableIdMap)
           .withRealtimeConversion(segmentCreationSpec.isRealtimeConversion())
+          .withConsumerDir(segmentCreationSpec.getConsumerDir())
           .build();
       //@formatter:on
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index ecfea58ca7..f75afb5b15 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -195,6 +195,7 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     // Optimization for realtime segment conversion
     if (dataSource instanceof RealtimeSegmentSegmentCreationDataSource) {
       _config.setRealtimeConversion(true);
+      _config.setConsumerDir(((RealtimeSegmentSegmentCreationDataSource) 
dataSource).getConsumerDir());
     }
 
     // Initialize stats collection
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
index c24778ab37..1e9581980d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
@@ -89,6 +89,8 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
    * @param segmentIndexDir segment index directory
    * @param commit true if the index should be committed (at the end after all 
documents have
    *               been added), false if index should not be committed
+   * @param realtimeConversion index creator should create an index using the 
realtime segment
+   * @param consumerDir consumer dir containing the realtime index, used when 
realtimeConversion and commit is true
    * @param immutableToMutableIdMap immutableToMutableIdMap from segment 
conversion
    * Note on commit:
    *               Once {@link SegmentColumnarIndexCreator}
@@ -106,7 +108,7 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
    * @param config the text index config
    */
   public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean 
commit, boolean realtimeConversion,
-      @Nullable int[] immutableToMutableIdMap, TextIndexConfig config) {
+      @Nullable File consumerDir, @Nullable int[] immutableToMutableIdMap, 
TextIndexConfig config) {
     _textColumn = column;
     _commitOnClose = commit;
 
@@ -144,7 +146,7 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
       if (_reuseMutableIndex) {
         LOGGER.info("Reusing the realtime lucene index for segment {} and 
column {}", segmentIndexDir, column);
         
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
-        convertMutableSegment(segmentIndexDir, immutableToMutableIdMap, 
indexWriterConfig);
+        convertMutableSegment(segmentIndexDir, consumerDir, 
immutableToMutableIdMap, indexWriterConfig);
         return;
       }
 
@@ -161,7 +163,7 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
 
   public LuceneTextIndexCreator(IndexCreationContext context, TextIndexConfig 
indexConfig) {
     this(context.getFieldSpec().getName(), context.getIndexDir(), 
context.isTextCommitOnClose(),
-        context.isRealtimeConversion(), context.getImmutableToMutableIdMap(), 
indexConfig);
+        context.isRealtimeConversion(), context.getConsumerDir(), 
context.getImmutableToMutableIdMap(), indexConfig);
   }
 
   public IndexWriter getIndexWriter() {
@@ -174,12 +176,12 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
    * @param immutableToMutableIdMap immutableToMutableIdMap from segment 
conversion
    * @param indexWriterConfig indexWriterConfig
    */
-  private void convertMutableSegment(File segmentIndexDir, @Nullable int[] 
immutableToMutableIdMap,
+  private void convertMutableSegment(File segmentIndexDir, File consumerDir, 
@Nullable int[] immutableToMutableIdMap,
       IndexWriterConfig indexWriterConfig) {
     try {
       // Copy the mutable index to the v1 index location
       File dest = getV1TextIndexFile(segmentIndexDir);
-      File mutableDir = getMutableIndexDir(segmentIndexDir);
+      File mutableDir = getMutableIndexDir(segmentIndexDir, consumerDir);
       FileUtils.copyDirectory(mutableDir, dest);
 
       // Remove the copied write.lock file
@@ -344,12 +346,15 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
     return new File(indexDir, luceneIndexDirectory);
   }
 
-  private File getMutableIndexDir(File indexDir) {
+  private File getMutableIndexDir(File indexDir, File consumerDir) {
+    String segmentName = getSegmentName(indexDir);
+    return new File(new File(consumerDir, segmentName),
+        _textColumn + 
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION);
+  }
+
+  private String getSegmentName(File indexDir) {
     // tmpSegmentName format: tmp-tableName__9__1__20240227T0254Z-1709002522086
     String tmpSegmentName = indexDir.getParentFile().getName();
-    String segmentName = 
tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4, 
tmpSegmentName.lastIndexOf('-'));
-    String mutableDir = indexDir.getParentFile().getParentFile().getParent() + 
"/consumers/" + segmentName + "/"
-        + _textColumn + 
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION;
-    return new File(mutableDir);
+    return tmpSegmentName.substring(tmpSegmentName.indexOf("tmp-") + 4, 
tmpSegmentName.lastIndexOf('-'));
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
index 07eb52f88b..ed027903b9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneTextIndexReader.java
@@ -188,6 +188,7 @@ public class LuceneTextIndexReader implements 
TextIndexReader {
     _indexReader.close();
     _indexDirectory.close();
     _docIdTranslator.close();
+    _analyzer.close();
   }
 
   /**
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
index cfbf6271f1..7a936aac76 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
@@ -194,9 +194,6 @@ public class TextIndexType extends 
AbstractIndexType<TextIndexConfig, TextIndexR
     if (config.getFstType() == FSTType.NATIVE) {
       return new NativeMutableTextIndex(context.getFieldSpec().getName());
     }
-    if (context.getConsumerDir() == null) {
-      throw new IllegalArgumentException("A consumer directory is required");
-    }
     return new RealtimeLuceneTextIndex(context.getFieldSpec().getName(), 
context.getConsumerDir(),
         context.getSegmentName(), config);
   }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index ade22fcad6..6a8ca27480 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.segment.local.indexsegment.mutable;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -49,6 +52,7 @@ public class MutableSegmentImplTestUtils {
   private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
   private static final String SEGMENT_NAME = "testSegment__0__0__155555";
   private static final String STREAM_NAME = "testStream";
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"MutableSegmentImplTestUtils");
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
       Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
@@ -118,7 +122,8 @@ public class MutableSegmentImplTestUtils {
         .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
         .setIngestionAggregationConfigs(aggregationConfigs)
         .setUpsertDropOutOfOrderRecord(isUpsertDropOutOfOrderRecord)
-        .setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn);
+        .setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn)
+        .setConsumerDir(TEMP_DIR.getAbsolutePath() + "/" + UUID.randomUUID() + 
"/consumerDir");
     for (Map.Entry<String, JsonIndexConfig> entry : 
jsonIndexConfigs.entrySet()) {
       segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(), 
entry.getValue());
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index e4ed4bb396..033acc7dbf 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -484,7 +484,7 @@ public class RealtimeSegmentConverterTest {
             
.setFieldConfigList(fieldConfigList).setSegmentZKMetadata(getSegmentZKMetadata(segmentName))
             .setOffHeap(true).setMemoryManager(new 
DirectMemoryManager(segmentName))
             .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new 
File(tmpDir, "stats")))
-            .setConsumerDir(new File(tmpDir, "consumers").getAbsolutePath());
+            .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
 
     // create mutable segment impl
     RealtimeLuceneTextIndexSearcherPool.init(1);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
index b180fd0c4e..5d1a2440a5 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
@@ -96,7 +96,7 @@ public class LuceneMutableTextIndexTest {
   }
 
   @Test(expectedExceptions = ExecutionException.class,
-      expectedExceptionsMessageRegExp = ".*TEXT_MATCH query timeout on 
realtime consuming segment.*")
+      expectedExceptionsMessageRegExp = ".*TEXT_MATCH query interrupted while 
querying the consuming segment.*")
   public void testQueryCancellationIsSuccessful()
       throws InterruptedException, ExecutionException {
     // Avoid early finalization by not using Executors.newSingleThreadExecutor 
(java <= 20, JDK-8145304)
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
index a385a60b03..97e12bc321 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
@@ -204,8 +204,10 @@ public class FilePerIndexDirectoryTest {
     TextIndexConfig config =
             new TextIndexConfig(false, null, null, false, false, null, null, 
true, 500, null, false);
     try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
-        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, config);
-        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, config)) {
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, null,
+            config);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, null,
+            config)) {
       PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(), 
1024);
       buf.putInt(0, 1);
 
@@ -267,8 +269,10 @@ public class FilePerIndexDirectoryTest {
             new TextIndexConfig(false, null, null, false, false, null, null, 
true, 500, null, false);
     // Write sth to buffers and flush them to index files on disk
     try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
-        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, config);
-        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, config)) {
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, null,
+            config);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, null,
+            config)) {
       PinotDataBuffer buf = fpi.newBuffer("col1", StandardIndexes.forward(), 
1024);
       buf.putInt(0, 111);
       buf = fpi.newBuffer("col2", StandardIndexes.dictionary(), 1024);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
index 3a94ceec11..b962550e3b 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
@@ -237,8 +237,10 @@ public class SingleFileIndexDirectoryTest {
     TextIndexConfig config =
             new TextIndexConfig(false, null, null, false, false, null, null, 
true, 500, null, false);
     try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
-        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, config);
-        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, config)) {
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, null,
+            config);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, null,
+            config)) {
       PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(), 
1024);
       buf.putInt(0, 1);
 
@@ -343,8 +345,10 @@ public class SingleFileIndexDirectoryTest {
     TextIndexConfig config =
             new TextIndexConfig(false, null, null, false, false, null, null, 
true, 500, null, false);
     try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
-        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, config);
-        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, config)) {
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true, false, null, null,
+            config);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true, false, null, null,
+            config)) {
       PinotDataBuffer buf = sfd.newBuffer("col1", StandardIndexes.forward(), 
1024);
       buf.putInt(0, 111);
       buf = sfd.newBuffer("col2", StandardIndexes.dictionary(), 1024);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java
index b675d866e5..7c4d0e1729 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/MutableSegment.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.spi;
 
+import java.io.File;
 import java.io.IOException;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -42,4 +43,9 @@ public interface MutableSegment extends IndexSegment {
    * @return The number of records indexed
    */
   int getNumDocsIndexed();
+
+  /**
+   * Returns the consumer dir containing any segment files.
+   */
+  File getConsumerDir();
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index 3ebe041e87..5d3a1a78ba 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -97,6 +97,11 @@ public interface IndexCreationContext {
    */
   boolean isRealtimeConversion();
 
+  /**
+   * Used in conjunction with isRealtimeConversion, this returns the location 
of the consumer directory used
+   */
+  File getConsumerDir();
+
   /**
    * This contains immutableToMutableIdMap mapping generated in {@link 
SegmentIndexCreationDriver}
    *
@@ -127,6 +132,7 @@ public interface IndexCreationContext {
     private boolean _fixedLength;
     private boolean _textCommitOnClose;
     private boolean _realtimeConversion = false;
+    private File _consumerDir;
     private int[] _immutableToMutableIdMap;
 
     public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo 
columnIndexCreationInfo) {
@@ -250,6 +256,11 @@ public interface IndexCreationContext {
       return this;
     }
 
+    public Builder withConsumerDir(File consumerDir) {
+      _consumerDir = consumerDir;
+      return this;
+    }
+
     public Builder withImmutableToMutableIdMap(int[] immutableToMutableIdMap) {
       _immutableToMutableIdMap = immutableToMutableIdMap;
       return this;
@@ -260,7 +271,7 @@ public interface IndexCreationContext {
           _maxRowLengthInBytes, _onHeap, Objects.requireNonNull(_fieldSpec), 
_sorted, _cardinality,
           _totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue, 
_maxValue, _forwardIndexDisabled,
           _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength, 
_textCommitOnClose, _columnStatistics,
-          _realtimeConversion, _immutableToMutableIdMap);
+          _realtimeConversion, _consumerDir, _immutableToMutableIdMap);
     }
 
     public Builder withSortedUniqueElementsArray(Object 
sortedUniqueElementsArray) {
@@ -295,6 +306,7 @@ public interface IndexCreationContext {
     private final boolean _textCommitOnClose;
     private final ColumnStatistics _columnStatistics;
     private final boolean _realtimeConversion;
+    private final File _consumerDir;
     private final int[] _immutableToMutableIdMap;
 
     public Common(File indexDir, int lengthOfLongestEntry,
@@ -302,7 +314,7 @@ public interface IndexCreationContext {
         FieldSpec fieldSpec, boolean sorted, int cardinality, int 
totalNumberOfEntries,
         int totalDocs, boolean hasDictionary, Comparable<?> minValue, 
Comparable<?> maxValue,
         boolean forwardIndexDisabled, Object sortedUniqueElementsArray, 
boolean optimizeDictionary, boolean fixedLength,
-        boolean textCommitOnClose, ColumnStatistics columnStatistics, boolean 
realtimeConversion,
+        boolean textCommitOnClose, ColumnStatistics columnStatistics, boolean 
realtimeConversion, File consumerDir,
         int[] immutableToMutableIdMap) {
       _indexDir = indexDir;
       _lengthOfLongestEntry = lengthOfLongestEntry;
@@ -324,6 +336,7 @@ public interface IndexCreationContext {
       _textCommitOnClose = textCommitOnClose;
       _columnStatistics = columnStatistics;
       _realtimeConversion = realtimeConversion;
+      _consumerDir = consumerDir;
       _immutableToMutableIdMap = immutableToMutableIdMap;
     }
 
@@ -416,6 +429,11 @@ public interface IndexCreationContext {
       return _realtimeConversion;
     }
 
+    @Override
+    public File getConsumerDir() {
+      return _consumerDir;
+    }
+
     @Override
     public int[] getImmutableToMutableIdMap() {
       return _immutableToMutableIdMap;
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index 5381bdc430..c879c1be52 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -121,6 +121,8 @@ public class SegmentGeneratorConfig implements Serializable 
{
   private boolean _optimizeDictionaryForMetrics = false;
   private double _noDictionarySizeRatioThreshold = 
IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD;
   private boolean _realtimeConversion = false;
+  // consumerDir contains data from the consuming segment, and is used during 
_realtimeConversion optimization
+  private File _consumerDir;
   private final Map<String, FieldIndexConfigs> _indexConfigsByColName;
 
   // constructed from FieldConfig
@@ -732,6 +734,14 @@ public class SegmentGeneratorConfig implements 
Serializable {
     _realtimeConversion = realtimeConversion;
   }
 
+  public File getConsumerDir() {
+    return _consumerDir;
+  }
+
+  public void setConsumerDir(File consumerDir) {
+    _consumerDir = consumerDir;
+  }
+
   public void setNoDictionarySizeRatioThreshold(double 
noDictionarySizeRatioThreshold) {
     _noDictionarySizeRatioThreshold = noDictionarySizeRatioThreshold;
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
index ab152c0fd2..57d71aa278 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
@@ -20,7 +20,6 @@ package org.apache.pinot.segment.spi.index.mutable.provider;
 
 import java.io.File;
 import java.util.Objects;
-import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 import org.apache.pinot.spi.data.FieldSpec;
 
@@ -94,7 +93,6 @@ public class MutableIndexContext {
     return _avgNumMultiValues;
   }
 
-  @Nullable
   public File getConsumerDir() {
     return _consumerDir;
   }
@@ -114,7 +112,6 @@ public class MutableIndexContext {
     private int _estimatedColSize;
     private int _estimatedCardinality;
     private int _avgNumMultiValues;
-    @Nullable
     private File _consumerDir;
 
     public Builder withMemoryManager(PinotDataBufferMemoryManager 
memoryManager) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to