This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9de57b01eb For consuming segment, avoid using setter in 
IndexLoadingConfig (#14190)
9de57b01eb is described below

commit 9de57b01eb1502ea3851f15edd6c2aa52f50b698
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Oct 9 11:47:48 2024 -0700

    For consuming segment, avoid using setter in IndexLoadingConfig (#14190)
---
 .../realtime/RealtimeSegmentDataManager.java       | 14 ++-----
 .../tests/BaseClusterIntegrationTest.java          |  7 ++++
 .../tests/LLCRealtimeClusterIntegrationTest.java   | 33 +++++++++++++++
 .../tests/OfflineClusterIntegrationTest.java       | 49 ++++++++++------------
 .../converter/RealtimeSegmentConverter.java        |  3 --
 .../local/realtime/impl/RealtimeSegmentConfig.java | 23 +++++++---
 .../segment/index/loader/IndexLoadingConfig.java   |  8 ----
 7 files changed, 82 insertions(+), 55 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index ecf5cb12cd..bed8f2a310 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -118,6 +118,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Segment data manager for low level consumer realtime segments, which 
manages consumption and segment completion.
  */
+@SuppressWarnings("jol")
 public class RealtimeSegmentDataManager extends SegmentDataManager {
 
   @VisibleForTesting
@@ -237,7 +238,6 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private final StreamDataDecoder _streamDataDecoder;
   private final int _segmentMaxRowCount;
   private final String _resourceDataDir;
-  private final IndexLoadingConfig _indexLoadingConfig;
   private final Schema _schema;
   // Semaphore for each partitionGroupId only, which is to prevent two 
different stream consumers
   // from consuming with the same partitionGroupId in parallel in the same 
host.
@@ -1446,7 +1446,6 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     _tableNameWithType = _tableConfig.getTableName();
     _realtimeTableDataManager = realtimeTableDataManager;
     _resourceDataDir = resourceDataDir;
-    _indexLoadingConfig = indexLoadingConfig;
     _schema = schema;
     _serverMetrics = serverMetrics;
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
@@ -1478,7 +1477,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             _segmentZKMetadata.getStatus().toString());
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
-    InstanceDataManagerConfig instanceDataManagerConfig = 
_indexLoadingConfig.getInstanceDataManagerConfig();
+    InstanceDataManagerConfig instanceDataManagerConfig = 
indexLoadingConfig.getInstanceDataManagerConfig();
     String clientIdSuffix =
         instanceDataManagerConfig != null ? 
instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
     if (StringUtils.isNotBlank(clientIdSuffix)) {
@@ -1488,7 +1487,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     }
     _segmentLogger = 
LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + 
_segmentNameStr);
     _tableStreamName = _tableNameWithType + "_" + streamTopic;
-    if (_indexLoadingConfig.isRealtimeOffHeapAllocation() && 
!_indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
+    if (indexLoadingConfig.isRealtimeOffHeapAllocation() && 
!indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
       _memoryManager =
           new MmapMemoryManager(_realtimeTableDataManager.getConsumerDir(), 
_segmentNameStr, _serverMetrics);
     } else {
@@ -1526,13 +1525,6 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         sortedColumn = null;
       }
     }
-    // Inverted index columns
-    // We need to add sorted column into inverted index columns because when 
we convert realtime in memory segment into
-    // offline segment, we use sorted column's inverted index to maintain the 
order of the records so that the records
-    // are sorted on the sorted column.
-    if (sortedColumn != null) {
-      indexLoadingConfig.addInvertedIndexColumns(sortedColumn);
-    }
 
     // Read the max number of rows
     int segmentMaxRowCount = 
segmentZKMetadata.getSizeThresholdToFlushSegment();
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 591a0308dd..ffe846cf9c 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -759,4 +759,11 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   protected long getLongCellValue(JsonNode jsonNode, int colIndex, int 
rowIndex) {
     return getCellValue(jsonNode, colIndex, rowIndex, 
JsonNode::asLong).longValue();
   }
+
+  protected JsonNode getColumnIndexSize(String column)
+      throws Exception {
+    return JsonUtils.stringToJsonNode(
+            
sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
 List.of(column))))
+        .get("columnIndexSizeMap").get(column);
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 31dd2d26a3..78b34fc563 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -49,6 +49,7 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
 import org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory;
 import org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConsumer;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -73,6 +74,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
@@ -321,6 +323,37 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
     testReload(false);
   }
 
+  @Test
+  public void testSortedColumn()
+      throws Exception {
+    // There should be no inverted index or range index sealed because the 
sorted column is not configured with them
+    JsonNode columnIndexSize = getColumnIndexSize(getSortedColumn());
+    assertFalse(columnIndexSize.has(StandardIndexes.INVERTED_ID));
+    assertFalse(columnIndexSize.has(StandardIndexes.RANGE_ID));
+
+    // For point lookup query, there should be no scan from the 
committed/consuming segments, but full scan from the
+    // uploaded segments:
+    // - Committed segments have sorted index
+    // - Consuming segments have inverted index
+    // - Uploaded segments have neither of them
+    String query = "SELECT COUNT(*) FROM myTable WHERE Carrier = 'DL'";
+    JsonNode response = postQuery(query);
+    long numEntriesScannedInFilter = 
response.get("numEntriesScannedInFilter").asLong();
+    long numDocsInUploadedSegments = super.getCountStarResult();
+    assertEquals(numEntriesScannedInFilter, numDocsInUploadedSegments);
+
+    // For range query, there should be no scan from the committed segments, 
but full scan from the uploaded/consuming
+    // segments:
+    // - Committed segments have sorted index
+    // - Consuming/Uploaded segments do not have sorted index
+    query = "SELECT COUNT(*) FROM myTable WHERE Carrier > 'DL'";
+    response = postQuery(query);
+    numEntriesScannedInFilter = 
response.get("numEntriesScannedInFilter").asLong();
+    // NOTE: If this test is running after force commit test, there will be no 
records in consuming segments
+    assertTrue(numEntriesScannedInFilter >= numDocsInUploadedSegments);
+    assertTrue(numEntriesScannedInFilter < 2 * numDocsInUploadedSegments);
+  }
+
   @Test(dataProvider = "useBothQueryEngines")
   public void testAddRemoveDictionaryAndInvertedIndex(boolean 
useMultiStageQueryEngine)
       throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 97f39f66a4..25a75352f7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1324,10 +1324,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       throws Exception {
     String column = "DestCityName";
     JsonNode columnIndexSize = getColumnIndexSize(column);
-    assertTrue(columnIndexSize.has("dictionary"));
-    assertTrue(columnIndexSize.has("forward_index"));
-    double dictionarySize = columnIndexSize.get("dictionary").asDouble();
-    double forwardIndexSize = columnIndexSize.get("forward_index").asDouble();
+    assertTrue(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+    assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+    double dictionarySize = 
columnIndexSize.get(StandardIndexes.DICTIONARY_ID).asDouble();
+    double forwardIndexSize = 
columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
 
     // Convert 'DestCityName' to raw index
     TableConfig tableConfig = getOfflineTableConfig();
@@ -1339,9 +1339,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     long numTotalDocs = getCountStarResult();
     reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
     columnIndexSize = getColumnIndexSize(column);
-    assertFalse(columnIndexSize.has("dictionary"));
-    assertTrue(columnIndexSize.has("forward_index"));
-    double v2rawIndexSize = columnIndexSize.get("forward_index").asDouble();
+    assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+    assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+    double v2rawIndexSize = 
columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
     assertTrue(v2rawIndexSize > forwardIndexSize);
 
     // NOTE: Currently Pinot doesn't support directly changing raw index 
version, so we need to first reset it back to
@@ -1361,9 +1361,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     updateTableConfig(tableConfig);
     reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
     columnIndexSize = getColumnIndexSize(column);
-    assertFalse(columnIndexSize.has("dictionary"));
-    assertTrue(columnIndexSize.has("forward_index"));
-    double v4RawIndexSize = columnIndexSize.get("forward_index").asDouble();
+    assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+    assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+    double v4RawIndexSize = 
columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
     assertTrue(v4RawIndexSize < v2rawIndexSize && v4RawIndexSize > 
forwardIndexSize);
 
     // Convert 'DestCityName' to SNAPPY compression
@@ -1377,9 +1377,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     updateTableConfig(tableConfig);
     reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
     columnIndexSize = getColumnIndexSize(column);
-    assertFalse(columnIndexSize.has("dictionary"));
-    assertTrue(columnIndexSize.has("forward_index"));
-    double v4SnappyRawIndexSize = 
columnIndexSize.get("forward_index").asDouble();
+    assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+    assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+    double v4SnappyRawIndexSize = 
columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
     assertTrue(v4SnappyRawIndexSize > v2rawIndexSize);
 
     // Removing FieldConfig should be no-op because compression is not 
explicitly set
@@ -1387,9 +1387,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     updateTableConfig(tableConfig);
     reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
     columnIndexSize = getColumnIndexSize(column);
-    assertFalse(columnIndexSize.has("dictionary"));
-    assertTrue(columnIndexSize.has("forward_index"));
-    assertEquals(columnIndexSize.get("forward_index").asDouble(), 
v4SnappyRawIndexSize);
+    assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+    assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+    assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(), 
v4SnappyRawIndexSize);
 
     // Adding 'LZ4' compression explicitly should trigger the conversion
     forwardIndexConfig = new 
ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.LZ4).build();
@@ -1400,28 +1400,21 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     updateTableConfig(tableConfig);
     reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
     columnIndexSize = getColumnIndexSize(column);
-    assertFalse(columnIndexSize.has("dictionary"));
-    assertTrue(columnIndexSize.has("forward_index"));
-    assertEquals(columnIndexSize.get("forward_index").asDouble(), 
v2rawIndexSize);
+    assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+    assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+    assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(), 
v2rawIndexSize);
 
     resetForwardIndex(dictionarySize, forwardIndexSize);
   }
 
-  private JsonNode getColumnIndexSize(String column)
-      throws Exception {
-    return JsonUtils.stringToJsonNode(
-            
sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
 List.of(column))))
-        .get("columnIndexSizeMap").get(column);
-  }
-
   private void resetForwardIndex(double expectedDictionarySize, double 
expectedForwardIndexSize)
       throws Exception {
     TableConfig tableConfig = createOfflineTableConfig();
     updateTableConfig(tableConfig);
     reloadAllSegments(SELECT_STAR_QUERY, false, getCountStarResult());
     JsonNode columnIndexSize = getColumnIndexSize("DestCityName");
-    assertEquals(columnIndexSize.get("dictionary").asDouble(), 
expectedDictionarySize);
-    assertEquals(columnIndexSize.get("forward_index").asDouble(), 
expectedForwardIndexSize);
+    
assertEquals(columnIndexSize.get(StandardIndexes.DICTIONARY_ID).asDouble(), 
expectedDictionarySize);
+    assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(), 
expectedForwardIndexSize);
   }
 
   /**
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index 2082f35622..65c69682f0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -63,9 +63,6 @@ public class RealtimeSegmentConverter {
     _segmentZKPropsConfig = segmentZKPropsConfig;
     _outputPath = outputPath;
     _columnIndicesForRealtimeTable = cdc;
-    if (cdc.getSortedColumn() != null) {
-      
_columnIndicesForRealtimeTable.getInvertedIndexColumns().remove(cdc.getSortedColumn());
-    }
     _dataSchema = getUpdatedSchema(schema);
     _tableName = tableName;
     _tableConfig = tableConfig;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 8d196eb645..5b3aeb26d5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -23,14 +23,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
 import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.FieldConfig;
@@ -279,18 +281,29 @@ public class RealtimeSegmentConfig {
     }
 
     public Builder(IndexLoadingConfig indexLoadingConfig) {
-      this(indexLoadingConfig.getFieldIndexConfigByColName());
+      this(indexLoadingConfig.getFieldIndexConfigByColName(), 
indexLoadingConfig.getSortedColumns());
     }
 
     public Builder(TableConfig tableConfig, Schema schema) {
-      this(FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, 
schema));
+      this(FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, 
schema),
+          tableConfig.getIndexingConfig().getSortedColumn());
     }
 
-    public Builder(Map<String, FieldIndexConfigs> indexConfigsByColName) {
-      _indexConfigByCol = new 
HashMap<>(HashUtil.getHashMapCapacity(indexConfigsByColName.size()));
+    public Builder(Map<String, FieldIndexConfigs> indexConfigsByColName, 
@Nullable List<String> sortedColumns) {
+      _indexConfigByCol = 
Maps.newHashMapWithExpectedSize(indexConfigsByColName.size());
       for (Map.Entry<String, FieldIndexConfigs> entry : 
indexConfigsByColName.entrySet()) {
         _indexConfigByCol.put(entry.getKey(), new 
FieldIndexConfigs.Builder(entry.getValue()));
       }
+      // Add inverted index to sorted column for 2 reasons:
+      // 1. Since sorted index doesn't apply to mutable segment, add inverted 
index to get better performance
+      // 2. When converting mutable segment to immutable segment, we use 
sorted column's inverted index to accelerate
+      //    the index creation
+      if (CollectionUtils.isNotEmpty(sortedColumns)) {
+        String sortedColumn = sortedColumns.get(0);
+        FieldIndexConfigs.Builder builder =
+            _indexConfigByCol.computeIfAbsent(sortedColumn, k -> new 
FieldIndexConfigs.Builder());
+        builder.add(StandardIndexes.inverted(), new IndexConfig(false));
+      }
     }
 
     public Builder setTableNameWithType(String tableNameWithType) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index c24c286000..0684fe9097 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.segment.local.segment.index.loader;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -462,13 +461,6 @@ public class IndexLoadingConfig {
     _dirty = true;
   }
 
-  @Deprecated
-  @VisibleForTesting
-  public void addInvertedIndexColumns(String... invertedIndexColumns) {
-    _invertedIndexColumns.addAll(Arrays.asList(invertedIndexColumns));
-    _dirty = true;
-  }
-
   @Deprecated
   @VisibleForTesting
   public void addNoDictionaryColumns(Collection<String> noDictionaryColumns) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to