This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fb7d220d3ee Create Cluster conf for No dict stats collector (#16967)
fb7d220d3ee is described below
commit fb7d220d3ee70206187f8ab82639b34e436d3593
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed Oct 8 15:46:28 2025 +0530
Create Cluster conf for No dict stats collector (#16967)
* Add ingestion skip filter for upsert SRT task
* Optimise index stats collector for no dict columns
* Test case fixes
* Add nodict collector to avoid changes to each collector to optimise no
dictionary columns
* Add nodict collector to avoid changes to each collector to optimise no
dictionary columns
* Checkstyle fixes
* Fix SegmentPreProcessorTest
* Linter fix
* Add support for native memory arrays
* Add ULL to approximate cardinality in NoDictColumnStatisticsCollector
* Revert "Fix SegmentPreProcessorTest"
This reverts commit 1fed6a27f0f2d8894814f24035d9a9feac6a495c.
* Fix tests after ULL approximation
* cstyle fixes
* Add tests related to no dict column
* cstyle fixes
* Add support for map type
* cstyle fixes
* 1. Add config to disable nodict column stats.
2. Add 5% buffer for cardinality estimation and update tests
* Fix test case
* Optimise map support for no dict columns
* Add documentation
* Update bits per element when cardinality changes
* Update cardinality from actual stats collector
* cstyle fixes
* test case fixes
* Add cluster config to choose stats collector for no dict columns
* Use HLL Plus plus instead of ULL as it generally returns approx
cardinality >= actual cardinality. Disable NoDictStatsCollector by default.
* cstyle fixes
* Fix table config flag
* Fix tests after 10% buffer
* Address PR comments
* Create ClusterConfigForTable
* Register DefaultClusterConfigChangeHandler in minion
* Rename optimise to optimize
---
.../org/apache/pinot/minion/BaseMinionStarter.java | 14 ++++
.../stats/SegmentPreIndexStatsCollectorImpl.java | 3 +-
.../segment/index/loader/ForwardIndexHandler.java | 3 +-
.../defaultcolumn/BaseDefaultColumnHandler.java | 3 +-
.../segment/local/utils/ClusterConfigForTable.java | 87 ++++++++++++++++++++++
.../server/starter/helix/BaseServerStarter.java | 5 ++
6 files changed, 112 insertions(+), 3 deletions(-)
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index a5cffb1a607..a783e2dda5e 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -39,6 +39,7 @@ import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.config.DefaultClusterConfigChangeHandler;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.MinionGauge;
import org.apache.pinot.common.metrics.MinionMeter;
@@ -64,6 +65,7 @@ import
org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
import org.apache.pinot.minion.taskfactory.TaskFactoryRegistry;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -99,6 +101,7 @@ public abstract class BaseMinionStarter implements
ServiceStartable {
protected MinionAdminApiApplication _minionAdminApplication;
protected List<ListenerConfig> _listenerConfigs;
protected ExecutorService _executorService;
+ protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler;
@Override
public void init(PinotConfiguration config)
@@ -124,6 +127,11 @@ public abstract class BaseMinionStarter implements
ServiceStartable {
}
_listenerConfigs = ListenerConfigUtil.buildMinionConfigs(_config);
_tlsPort = ListenerConfigUtil.findLastTlsPort(_listenerConfigs, -1);
+ _clusterConfigChangeHandler = new DefaultClusterConfigChangeHandler();
+ // Register cluster-level override for table configs
+ _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+ new ClusterConfigForTable.ConfigChangeListener());
+ LOGGER.info("Registered ClusterConfigForTable change listener");
_helixManager = new ZKHelixManager(helixClusterName, _instanceId,
InstanceType.PARTICIPANT, zkAddress);
MinionTaskZkMetadataManager minionTaskZkMetadataManager = new
MinionTaskZkMetadataManager(_helixManager);
_taskExecutorFactoryRegistry = new
TaskExecutorFactoryRegistry(minionTaskZkMetadataManager, _config);
@@ -301,6 +309,12 @@ public abstract class BaseMinionStarter implements
ServiceStartable {
() -> _helixManager.isConnected() ? 1L : 0L);
minionContext.setHelixPropertyStore(_helixManager.getHelixPropertyStore());
minionContext.setHelixManager(_helixManager);
+ LOGGER.info("Initializing and registering the
DefaultClusterConfigChangeHandler");
+ try {
+ _helixManager.addClusterfigChangeListener(_clusterConfigChangeHandler);
+ } catch (Exception e) {
+ LOGGER.error("Failed to register DefaultClusterConfigChangeHandler as
the Helix ClusterConfigChangeListener", e);
+ }
LOGGER.info("Starting minion admin application on: {}",
ListenerConfigUtil.toString(_listenerConfigs));
_minionAdminApplication = createMinionAdminApp();
_minionAdminApplication.start(_listenerConfigs);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
index c3084057770..466adf582d8 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
@@ -20,6 +20,7 @@ package
org.apache.pinot.segment.local.segment.creator.impl.stats;
import java.util.HashMap;
import java.util.Map;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
import org.apache.pinot.segment.spi.creator.ColumnStatistics;
import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
@@ -57,7 +58,7 @@ public class SegmentPreIndexStatsCollectorImpl implements
SegmentPreIndexStatsCo
if (!dictionaryEnabled) {
// MAP collector is optimised for no-dictionary collection
if
(!fieldSpec.getDataType().getStoredType().equals(FieldSpec.DataType.MAP)) {
- if
(_statsCollectorConfig.getTableConfig().getIndexingConfig().isOptimizeNoDictStatsCollection())
{
+ if
(ClusterConfigForTable.useOptimizedNoDictCollector(_statsCollectorConfig.getTableConfig()))
{
_columnStatsCollectorMap.put(column, new
NoDictColumnStatisticsCollector(column, _statsCollectorConfig));
continue;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 29ecc13c55b..52a606e1ec4 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -46,6 +46,7 @@ import
org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnSta
import
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
import
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -1030,7 +1031,7 @@ public class ForwardIndexHandler extends BaseIndexHandler
{
boolean dictionaryEnabled = hasIndex(column, StandardIndexes.dictionary());
// MAP collector is optimised for no-dictionary collection
if (!dictionaryEnabled && storedType != DataType.MAP) {
- if (_tableConfig.getIndexingConfig().isOptimizeNoDictStatsCollection()) {
+ if (ClusterConfigForTable.useOptimizedNoDictCollector(_tableConfig)) {
return new NoDictColumnStatisticsCollector(column,
statsCollectorConfig);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 2b4186ce5ea..d2acb18f4d8 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -55,6 +55,7 @@ import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
@@ -672,7 +673,7 @@ public abstract class BaseDefaultColumnHandler implements
DefaultColumnHandler {
boolean createDictionary = dictionaryIndexConfig.isEnabled();
boolean useNoDictColumnStatsCollector = false;
if (!dictionaryIndexConfig.isEnabled()) {
- useNoDictColumnStatsCollector =
_tableConfig.getIndexingConfig().isOptimizeNoDictStatsCollection();
+ useNoDictColumnStatsCollector =
ClusterConfigForTable.useOptimizedNoDictCollector(_tableConfig);
}
StatsCollectorConfig statsCollectorConfig = new
StatsCollectorConfig(_tableConfig, _schema, null);
ColumnIndexCreationInfo indexCreationInfo;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ClusterConfigForTable.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ClusterConfigForTable.java
new file mode 100644
index 00000000000..bcb125e2d86
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ClusterConfigForTable.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Contains cluster level config for certain table config attributes.
+ * This is useful for enabling/disabling certain features across the cluster
+ * Each individual config will have its own precedence rules.
+ */
+public final class ClusterConfigForTable {
+
+ // Controls whether to use NoDictColumnStatisticsCollector for no-dictionary
columns globally.
+ // If unset at cluster level, table-level config is used.
+ // If cluster level is set to true/false, it overrides the table-level
config.
+ private static final String OPTIMISE_NO_DICT_STATS_COLLECTION_CONF =
"pinot.stats.optimize.no.dict.collection";
+
+ private ClusterConfigForTable() {
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterConfigForTable.class);
+ // if a key isn't present, it means it is not set at cluster level
+ private static final Map<String, Boolean> CLUSTER_BOOLEAN_FLAGS = new
ConcurrentHashMap<>();
+
+ /** Listener that updates the cached cluster override on config changes. */
+ public static class ConfigChangeListener implements
PinotClusterConfigChangeListener {
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ checkNoDictStatsCollectorConfig(changedConfigs, clusterConfigs);
+ }
+
+ private void checkNoDictStatsCollectorConfig(Set<String> changedConfigs,
Map<String, String> clusterConfigs) {
+ if (!changedConfigs.contains(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF)) {
+ return;
+ }
+ String v = clusterConfigs.get(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+ if (v == null) {
+ CLUSTER_BOOLEAN_FLAGS.remove(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+ return;
+ }
+ // Accept only explicit true/false, ignore invalid values by resetting
to null to fall back to table config
+ String lower = v.trim().toLowerCase();
+ if ("true".equals(lower)) {
+ CLUSTER_BOOLEAN_FLAGS.put(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF,
Boolean.TRUE);
+ } else if ("false".equals(lower)) {
+ CLUSTER_BOOLEAN_FLAGS.put(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF,
Boolean.FALSE);
+ } else {
+ LOGGER.warn("Invalid value: {} for config: {}, ignoring and falling
back to table config", v,
+ OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+ CLUSTER_BOOLEAN_FLAGS.remove(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+ }
+ }
+ }
+
+ /** Returns whether we should use the optimized collector, applying
cluster-level override if set. */
+ public static boolean useOptimizedNoDictCollector(TableConfig tableConfig) {
+ Boolean override =
CLUSTER_BOOLEAN_FLAGS.get(OPTIMISE_NO_DICT_STATS_COLLECTION_CONF);
+ if (override != null) {
+ return override;
+ }
+ return tableConfig.getIndexingConfig().isOptimizeNoDictStatsCollection();
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 6add0ee925c..ddcfd5c2514 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -89,6 +89,7 @@ import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshManager;
import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool;
import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
+import org.apache.pinot.segment.local.utils.ClusterConfigForTable;
import org.apache.pinot.segment.local.utils.SegmentAllIndexPreprocessThrottler;
import org.apache.pinot.segment.local.utils.SegmentDownloadThrottler;
import
org.apache.pinot.segment.local.utils.SegmentMultiColTextIndexPreprocessThrottler;
@@ -265,6 +266,10 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_clusterConfigChangeHandler.registerClusterConfigChangeListener(
new TextIndexUtils.LuceneMaxClauseCountConfigChangeListener());
LOGGER.info("Registered Lucene max clause count configuration change
listener");
+ // Register cluster-level override for table configs
+ _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+ new ClusterConfigForTable.ConfigChangeListener());
+ LOGGER.info("Registered ClusterConfigForTable change listener");
LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName:
{}, instanceId: {}", _zkAddress,
_helixClusterName, _instanceId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]