This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fb7d220d3ee Create Cluster conf for No dict stats collector (#16967)
fb7d220d3ee is described below

commit fb7d220d3ee70206187f8ab82639b34e436d3593
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed Oct 8 15:46:28 2025 +0530

    Create Cluster conf for No dict stats collector (#16967)
    
    * Add ingestion skip filter for upsert SRT task
    
    * Optimise index stats collector for no dict columns
    
    * Test case fixes
    
    * Add nodict collector to avoid changes to each collector to optimise no 
dictionary columns
    
    * Add nodict collector to avoid changes to each collector to optimise no 
dictionary columns
    
    * Checkstyle fixes
    
    * Fix SegmentPreProcessorTest
    
    * Linter fix
    
    * Add support for native memory arrays
    
    * Add ULL to approximate cardinality in NoDictColumnStatisticsCollector
    
    * Revert "Fix SegmentPreProcessorTest"
    
    This reverts commit 1fed6a27f0f2d8894814f24035d9a9feac6a495c.
    
    * Fix tests after ULL approximation
    
    * cstyle fixes
    
    * Add tests related to no dict column
    
    * cstyle fixes
    
    * Add support for map type
    
    * cstyle fixes
    
    * 1. Add config to disable nodict column stats.
    2. Add 5% buffer for cardinality estimation and update tests
    
    * Fix test case
    
    * Optimise map support for no dict columns
    
    * Add documentation
    
    * Update bits per element when cardinality changes
    
    * Update cardinality from actual stats collector
    
    * cstyle fixes
    
    * test case fixes
    
    * Add cluster config to choose stats collector for no dict columns
    
    * Use HLL Plus plus instead of ULL as it generally returns approx 
cardinality >= actual cardinality. Disable NoDictStatsCollector by default.
    
    * cstyle fixes
    
    * Fix table config flag
    
    * Fix tests after 10% buffer
    
    * Address PR comments
    
    * Create ClusterConfigForTable
    
    * Register DefaultClusterConfigChangeHandler in minion
    
    * Rename optimise to optimize
---
 .../org/apache/pinot/minion/BaseMinionStarter.java | 14 ++++
 .../stats/SegmentPreIndexStatsCollectorImpl.java   |  3 +-
 .../segment/index/loader/ForwardIndexHandler.java  |  3 +-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  3 +-
 .../segment/local/utils/ClusterConfigForTable.java | 87 ++++++++++++++++++++++
 .../server/starter/helix/BaseServerStarter.java    |  5 ++
 6 files changed, 112 insertions(+), 3 deletions(-)

diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java 
b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index a5cffb1a607..a783e2dda5e 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -39,6 +39,7 @@ import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.config.DefaultClusterConfigChangeHandler;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metrics.MinionGauge;
 import org.apache.pinot.common.metrics.MinionMeter;
@@ -64,6 +65,7 @@ import 
org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
 import org.apache.pinot.minion.taskfactory.TaskFactoryRegistry;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -99,6 +101,7 @@ public abstract class BaseMinionStarter implements 
ServiceStartable {
   protected MinionAdminApiApplication _minionAdminApplication;
   protected List<ListenerConfig> _listenerConfigs;
   protected ExecutorService _executorService;
+  protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler;
 
   @Override
   public void init(PinotConfiguration config)
@@ -124,6 +127,11 @@ public abstract class BaseMinionStarter implements 
ServiceStartable {
     }
     _listenerConfigs = ListenerConfigUtil.buildMinionConfigs(_config);
     _tlsPort = ListenerConfigUtil.findLastTlsPort(_listenerConfigs, -1);
+    _clusterConfigChangeHandler = new DefaultClusterConfigChangeHandler();
+    // Register cluster-level override for table configs
+    _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+        new ClusterConfigForTable.ConfigChangeListener());
+    LOGGER.info("Registered ClusterConfigForTable change listener");
     _helixManager = new ZKHelixManager(helixClusterName, _instanceId, 
InstanceType.PARTICIPANT, zkAddress);
     MinionTaskZkMetadataManager minionTaskZkMetadataManager = new 
MinionTaskZkMetadataManager(_helixManager);
     _taskExecutorFactoryRegistry = new 
TaskExecutorFactoryRegistry(minionTaskZkMetadataManager, _config);
@@ -301,6 +309,12 @@ public abstract class BaseMinionStarter implements 
ServiceStartable {
             () -> _helixManager.isConnected() ? 1L : 0L);
     minionContext.setHelixPropertyStore(_helixManager.getHelixPropertyStore());
     minionContext.setHelixManager(_helixManager);
+    LOGGER.info("Initializing and registering the 
DefaultClusterConfigChangeHandler");
+    try {
+      _helixManager.addClusterfigChangeListener(_clusterConfigChangeHandler);
+    } catch (Exception e) {
+      LOGGER.error("Failed to register DefaultClusterConfigChangeHandler as 
the Helix ClusterConfigChangeListener", e);
+    }
     LOGGER.info("Starting minion admin application on: {}", 
ListenerConfigUtil.toString(_listenerConfigs));
     _minionAdminApplication = createMinionAdminApp();
     _minionAdminApplication.start(_listenerConfigs);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
index c3084057770..466adf582d8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
@@ -20,6 +20,7 @@ package 
org.apache.pinot.segment.local.segment.creator.impl.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
 import org.apache.pinot.segment.spi.creator.ColumnStatistics;
 import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
@@ -57,7 +58,7 @@ public class SegmentPreIndexStatsCollectorImpl implements 
SegmentPreIndexStatsCo
       if (!dictionaryEnabled) {
         // MAP collector is optimised for no-dictionary collection
         if 
(!fieldSpec.getDataType().getStoredType().equals(FieldSpec.DataType.MAP)) {
-          if 
(_statsCollectorConfig.getTableConfig().getIndexingConfig().isOptimizeNoDictStatsCollection())
 {
+          if 
(ClusterConfigForTable.useOptimizedNoDictCollector(_statsCollectorConfig.getTableConfig()))
 {
             _columnStatsCollectorMap.put(column, new 
NoDictColumnStatisticsCollector(column, _statsCollectorConfig));
             continue;
           }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 29ecc13c55b..52a606e1ec4 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -46,6 +46,7 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnSta
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -1030,7 +1031,7 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
     boolean dictionaryEnabled = hasIndex(column, StandardIndexes.dictionary());
     // MAP collector is optimised for no-dictionary collection
     if (!dictionaryEnabled && storedType != DataType.MAP) {
-      if (_tableConfig.getIndexingConfig().isOptimizeNoDictStatsCollection()) {
+      if (ClusterConfigForTable.useOptimizedNoDictCollector(_tableConfig)) {
         return new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
       }
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 2b4186ce5ea..d2acb18f4d8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -55,6 +55,7 @@ import 
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
@@ -672,7 +673,7 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
       boolean createDictionary = dictionaryIndexConfig.isEnabled();
       boolean useNoDictColumnStatsCollector = false;
       if (!dictionaryIndexConfig.isEnabled()) {
-        useNoDictColumnStatsCollector = 
_tableConfig.getIndexingConfig().isOptimizeNoDictStatsCollection();
+        useNoDictColumnStatsCollector = 
ClusterConfigForTable.useOptimizedNoDictCollector(_tableConfig);
       }
       StatsCollectorConfig statsCollectorConfig = new 
StatsCollectorConfig(_tableConfig, _schema, null);
       ColumnIndexCreationInfo indexCreationInfo;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ClusterConfigForTable.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ClusterConfigForTable.java
new file mode 100644
index 00000000000..bcb125e2d86
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ClusterConfigForTable.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Contains cluster level config for certain table config attributes.
+ * This is useful for enabling/disabling certain features across the cluster
+ * Each individual config will have its own precedence rules.
+ */
+public final class ClusterConfigForTable {
+
+  // Controls whether to use NoDictColumnStatisticsCollector for no-dictionary 
columns globally.
+  // If unset at cluster level, table-level config is used.
+  // If cluster level is set to true/false, it overrides the table-level 
config.
+  private static final String OPTIMISE_NO_DICT_STATS_COLLECTION_CONF = 
"pinot.stats.optimize.no.dict.collection";
+
+  private ClusterConfigForTable() {
+  }
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterConfigForTable.class);
+  // if a key isn't present, it means it is not set at cluster level
+  private static final Map<String, Boolean> CLUSTER_BOOLEAN_FLAGS = new 
ConcurrentHashMap<>();
+
+  /** Listener that updates the cached cluster override on config changes. */
+  public static class ConfigChangeListener implements 
PinotClusterConfigChangeListener {
+    @Override
+    public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+      checkNoDictStatsCollectorConfig(changedConfigs, clusterConfigs);
+    }
+
+    private void checkNoDictStatsCollectorConfig(Set<String> changedConfigs, 
Map<String, String> clusterConfigs) {
+      if (!changedConfigs.contains(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF)) {
+        return;
+      }
+      String v = clusterConfigs.get(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+      if (v == null) {
+        CLUSTER_BOOLEAN_FLAGS.remove(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+        return;
+      }
+      // Accept only explicit true/false, ignore invalid values by resetting 
to null to fall back to table config
+      String lower = v.trim().toLowerCase();
+      if ("true".equals(lower)) {
+        CLUSTER_BOOLEAN_FLAGS.put(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF, 
Boolean.TRUE);
+      } else if ("false".equals(lower)) {
+        CLUSTER_BOOLEAN_FLAGS.put(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF, 
Boolean.FALSE);
+      } else {
+        LOGGER.warn("Invalid value: {} for config: {}, ignoring and falling 
back to table config", v,
+            OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+        CLUSTER_BOOLEAN_FLAGS.remove(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+      }
+    }
+  }
+
+  /** Returns whether we should use the optimized collector, applying 
cluster-level override if set. */
+  public static boolean useOptimizedNoDictCollector(TableConfig tableConfig) {
+    Boolean override = 
CLUSTER_BOOLEAN_FLAGS.get(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+    if (override != null) {
+      return override;
+    }
+    return tableConfig.getIndexingConfig().isOptimizeNoDictStatsCollection();
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 6add0ee925c..ddcfd5c2514 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -89,6 +89,7 @@ import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshManager;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool;
 import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
 import org.apache.pinot.segment.local.utils.SegmentAllIndexPreprocessThrottler;
 import org.apache.pinot.segment.local.utils.SegmentDownloadThrottler;
 import 
org.apache.pinot.segment.local.utils.SegmentMultiColTextIndexPreprocessThrottler;
@@ -265,6 +266,10 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _clusterConfigChangeHandler.registerClusterConfigChangeListener(
         new TextIndexUtils.LuceneMaxClauseCountConfigChangeListener());
     LOGGER.info("Registered Lucene max clause count configuration change 
listener");
+    // Register cluster-level override for table configs
+    _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+        new ClusterConfigForTable.ConfigChangeListener());
+    LOGGER.info("Registered ClusterConfigForTable change listener");
 
     LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName: 
{}, instanceId: {}", _zkAddress,
         _helixClusterName, _instanceId);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to