This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d488bd1e088 Make the ResourceUtilizationChecker more easily extensible and add a PK count endpoint to the server side (#16146) d488bd1e088 is described below commit d488bd1e0881ca93f4ed50d7be63e8424590050f Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Thu Jun 26 21:32:27 2025 -0700 Make the ResourceUtilizationChecker more easily extensible and add a PK count endpoint to the server side (#16146) * Expose server-side API to fetch the primary key count across all tables (upsert and dedup enabled) * Controller side changes to integrate primary key count check with resource utilization checker * Add support to skip the PK check from the Minion side * Rename class and methods * Fix log * Create a common interface for all resource utilization checkers and have disk and PK count implement that * Fix test * Minor rename * Remove the additional checker and related tests * Minor fixes * Address review comments * rename in test * Address review comments * Add an enum CheckPurpose to capture caller context, address other review comments * Add some comments around CheckPurpose --- .../restlet/resources/PrimaryKeyCountInfo.java | 73 ++++++ .../pinot/controller/BaseControllerStarter.java | 11 +- .../helix/core/minion/PinotTaskManager.java | 7 +- .../validation/DiskUtilizationChecker.java | 13 +- .../RealtimeSegmentValidationManager.java | 3 +- .../validation/ResourceUtilizationChecker.java | 12 +- .../validation/ResourceUtilizationManager.java | 24 +- .../controller/validation/UtilizationChecker.java | 58 +++++ .../validation/DiskUtilizationCheckerTest.java | 57 +++-- .../RealtimeSegmentValidationManagerTest.java | 4 +- .../validation/ResourceUtilizationManagerTest.java | 70 +++++- .../manager/realtime/RealtimeTableDataManager.java | 5 + .../dedup/BasePartitionDedupMetadataManager.java | 2 - .../local/dedup/BaseTableDedupMetadataManager.java | 10 + ...ConcurrentMapPartitionDedupMetadataManager.java | 2 +- .../local/dedup/PartitionDedupMetadataManager.java | 2 + .../local/dedup/TableDedupMetadataManager.java | 8 + .../BasePartitionDedupMetadataManagerTest.java | 2 +- .../server/api/resources/InstanceResource.java | 22 ++ .../server/api/resources/PrimaryKeyCount.java | 90 +++++++ .../pinot/server/api/PrimaryKeyCountTest.java | 275 +++++++++++++++++++++ 21 files changed, 698 insertions(+), 52 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/PrimaryKeyCountInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/PrimaryKeyCountInfo.java new file mode 100644 index 00000000000..57dffa05326 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/PrimaryKeyCountInfo.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.restlet.resources; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Set; + + +@JsonIgnoreProperties(ignoreUnknown = true) +public class PrimaryKeyCountInfo { + private final String _instanceId; + private final long _numPrimaryKeys; + private final Set<String> _upsertAndDedupTables; + private final long _lastUpdatedTimeInEpochMs; + + public PrimaryKeyCountInfo(@JsonProperty("instanceId") String instanceId) { + _instanceId = instanceId; + _numPrimaryKeys = -1; + _upsertAndDedupTables = Set.of(); + _lastUpdatedTimeInEpochMs = -1; + } + + @JsonCreator + public PrimaryKeyCountInfo(@JsonProperty("instanceId") String instanceId, + @JsonProperty("numPrimaryKeys") long numPrimaryKeys, + @JsonProperty("upsertAndDedupTables") Set<String> upsertAndDedupTables, + @JsonProperty("lastUpdatedTimeInEpochMs") long lastUpdatedTimeInEpochMs) { + _instanceId = instanceId; + _numPrimaryKeys = numPrimaryKeys; + _upsertAndDedupTables = upsertAndDedupTables; + _lastUpdatedTimeInEpochMs = lastUpdatedTimeInEpochMs; + } + + public String getInstanceId() { + return _instanceId; + } + + public long getNumPrimaryKeys() { + return _numPrimaryKeys; + } + + public Set<String> getUpsertAndDedupTables() { + return _upsertAndDedupTables; + } + + public long getLastUpdatedTimeInEpochMs() { + return _lastUpdatedTimeInEpochMs; + } + + public String toString() { + return "PrimaryKeyCountInfo{" + "_instanceId='" + _instanceId + ", _numPrimaryKeys=" + _numPrimaryKeys + + ", _upsertAndDedupTables=" + _upsertAndDedupTables + ", _lastUpdatedTimeInEpochMs=" + + _lastUpdatedTimeInEpochMs + '}'; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 54854f0e5b0..8cca18613be 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -122,6 +122,7 @@ import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; import org.apache.pinot.controller.validation.ResourceUtilizationChecker; import org.apache.pinot.controller.validation.ResourceUtilizationManager; import org.apache.pinot.controller.validation.StorageQuotaChecker; +import org.apache.pinot.controller.validation.UtilizationChecker; import org.apache.pinot.core.periodictask.PeriodicTask; import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; @@ -212,6 +213,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected ExecutorService _rebalancerExecutorService; protected TableSizeReader _tableSizeReader; protected StorageQuotaChecker _storageQuotaChecker; + protected final List<UtilizationChecker> _utilizationCheckers = new ArrayList<>(); protected DiskUtilizationChecker _diskUtilizationChecker; protected ResourceUtilizationManager _resourceUtilizationManager; protected RebalancePreChecker _rebalancePreChecker; @@ -343,6 +345,10 @@ public abstract class BaseControllerStarter implements ServiceStartable { MAX_STATE_TRANSITIONS_PER_INSTANCE, constraintItem); } + protected void addUtilizationChecker(UtilizationChecker utilizationChecker) { + _utilizationCheckers.add(utilizationChecker); + } + /** * Creates an instance of PinotHelixResourceManager. * <p> @@ -554,7 +560,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { _helixResourceManager, _config); _diskUtilizationChecker = new DiskUtilizationChecker(_helixResourceManager, _config); - _resourceUtilizationManager = new ResourceUtilizationManager(_config, _diskUtilizationChecker); + addUtilizationChecker(_diskUtilizationChecker); + _resourceUtilizationManager = new ResourceUtilizationManager(_config, _utilizationCheckers); _rebalancePreChecker = RebalancePreCheckerFactory.create(_config.getRebalancePreCheckerClass()); _rebalancePreChecker.init(_helixResourceManager, _executorService, _config.getRebalanceDiskUtilizationThreshold()); _rebalancerExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(), @@ -903,7 +910,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { _controllerMetrics, _executorService, _connectionManager); periodicTasks.add(responseStoreCleaner); PeriodicTask resourceUtilizationChecker = new ResourceUtilizationChecker(_config, _connectionManager, - _controllerMetrics, _diskUtilizationChecker, _executorService, _helixResourceManager); + _controllerMetrics, _utilizationCheckers, _executorService, _helixResourceManager); periodicTasks.add(resourceUtilizationChecker); return periodicTasks; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index ef86dbab2d6..96f9154241f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -57,6 +57,7 @@ import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerato import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.validation.ResourceUtilizationManager; +import org.apache.pinot.controller.validation.UtilizationChecker; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; @@ -222,7 +223,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); LOGGER.info("Trying to create tasks of type: {}, table: {}", taskType, tableNameWithType); try { - if (!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType)) { + if (!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType, + UtilizationChecker.CheckPurpose.TASK_GENERATION)) { LOGGER.warn("Resource utilization is above threshold, skipping task creation for table: {}", tableName); _controllerMetrics.setOrUpdateTableGauge(tableName, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L); continue; @@ -721,7 +723,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { for (TableConfig tableConfig : enabledTableConfigs) { String tableName = tableConfig.getTableName(); try { - if (!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName)) { + if (!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.TASK_GENERATION)) { String message = String.format("Skipping tasks generation as resource utilization is not within limits for " + "table: %s. Disk utilization for one or more servers hosting this table has exceeded the threshold. " + "Tasks won't be generated until the issue is mitigated.", tableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java index 8edd8b6cd11..fa2f1bfbea2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java @@ -36,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DiskUtilizationChecker { +public class DiskUtilizationChecker implements UtilizationChecker { private static final Logger LOGGER = LoggerFactory.getLogger(DiskUtilizationChecker.class); private final int _resourceUtilizationCheckTimeoutMs; private final long _resourceUtilizationCheckerFrequencyMs; @@ -54,10 +54,16 @@ public class DiskUtilizationChecker { _resourceUtilizationCheckerFrequencyMs = controllerConf.getResourceUtilizationCheckerFrequency() * 1000; } + @Override + public String getName() { + return DiskUtilizationChecker.class.getSimpleName(); + } + /** * Check if disk utilization for the requested table is within the configured limits. */ - public boolean isDiskUtilizationWithinLimits(String tableNameWithType) { + @Override + public boolean isResourceUtilizationWithinLimits(String tableNameWithType, UtilizationChecker.CheckPurpose purpose) { if (StringUtils.isEmpty(tableNameWithType)) { throw new IllegalArgumentException("Table name found to be null or empty while computing disk utilization."); } @@ -102,7 +108,8 @@ public class DiskUtilizationChecker { /** * Compute disk utilization for the requested instances using the <code>CompletionServiceHelper</code>. */ - public void computeDiskUtilization(BiMap<String, String> endpointsToInstances, + @Override + public void computeResourceUtilization(BiMap<String, String> endpointsToInstances, CompletionServiceHelper completionServiceHelper) { List<String> diskUtilizationUris = new ArrayList<>(endpointsToInstances.size()); for (String endpoint : endpointsToInstances.keySet()) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 21d3d428024..39cf6f1b75f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -157,7 +157,8 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea } // Perform resource utilization checks. boolean isResourceUtilizationWithinLimits = - _resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType); + _resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); if (!isResourceUtilizationWithinLimits) { LOGGER.warn("Resource utilization limit exceeded for table: {}", tableNameWithType); _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java index 60990f5151d..338d194c85e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.validation; import com.google.common.collect.BiMap; import java.util.HashSet; +import java.util.List; import java.util.Properties; import java.util.Set; import java.util.concurrent.Executor; @@ -45,18 +46,18 @@ public class ResourceUtilizationChecker extends BasePeriodicTask { private final PoolingHttpClientConnectionManager _connectionManager; private final ControllerMetrics _controllerMetrics; - private final DiskUtilizationChecker _diskUtilizationChecker; + private final List<UtilizationChecker> _utilizationCheckers; private final Executor _executor; private final PinotHelixResourceManager _helixResourceManager; public ResourceUtilizationChecker(ControllerConf config, PoolingHttpClientConnectionManager connectionManager, - ControllerMetrics controllerMetrics, DiskUtilizationChecker diskUtilizationChecker, Executor executor, + ControllerMetrics controllerMetrics, List<UtilizationChecker> utilizationCheckers, Executor executor, PinotHelixResourceManager pinotHelixResourceManager) { super(TASK_NAME, config.getResourceUtilizationCheckerFrequency(), config.getResourceUtilizationCheckerInitialDelay()); _connectionManager = connectionManager; _controllerMetrics = controllerMetrics; - _diskUtilizationChecker = diskUtilizationChecker; + _utilizationCheckers = utilizationCheckers; _executor = executor; _helixResourceManager = pinotHelixResourceManager; } @@ -77,7 +78,10 @@ public class ResourceUtilizationChecker extends BasePeriodicTask { BiMap<String, String> endpointsToInstances = instanceAdminEndpoints.inverse(); CompletionServiceHelper completionServiceHelper = new CompletionServiceHelper(_executor, _connectionManager, endpointsToInstances); - _diskUtilizationChecker.computeDiskUtilization(endpointsToInstances, completionServiceHelper); + for (UtilizationChecker utilizationChecker : _utilizationCheckers) { + LOGGER.debug("Computing resource utilization for checker: {}", utilizationChecker.getName()); + utilizationChecker.computeResourceUtilization(endpointsToInstances, completionServiceHelper); + } } catch (Exception e) { LOGGER.error("Caught exception while running task: {}", _taskName, e); _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java index c53507913ba..396d6ba68c4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.validation; +import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.controller.ControllerConf; import org.slf4j.Logger; @@ -28,15 +29,16 @@ public class ResourceUtilizationManager { private static final Logger LOGGER = LoggerFactory.getLogger(ResourceUtilizationManager.class); private final boolean _isResourceUtilizationCheckEnabled; - private final DiskUtilizationChecker _diskUtilizationChecker; + private final List<UtilizationChecker> _utilizationCheckers; - public ResourceUtilizationManager(ControllerConf controllerConf, DiskUtilizationChecker diskUtilizationChecker) { + public ResourceUtilizationManager(ControllerConf controllerConf, List<UtilizationChecker> utilizationCheckers) { _isResourceUtilizationCheckEnabled = controllerConf.isResourceUtilizationCheckEnabled(); - LOGGER.info("Resource utilization check is: {}", _isResourceUtilizationCheckEnabled ? "enabled" : "disabled"); - _diskUtilizationChecker = diskUtilizationChecker; + LOGGER.info("Resource utilization check is: {}, with {} resource utilization checkers", + _isResourceUtilizationCheckEnabled ? "enabled" : "disabled", utilizationCheckers.size()); + _utilizationCheckers = utilizationCheckers; } - public boolean isResourceUtilizationWithinLimits(String tableNameWithType) { + public boolean isResourceUtilizationWithinLimits(String tableNameWithType, UtilizationChecker.CheckPurpose purpose) { if (!_isResourceUtilizationCheckEnabled) { return true; } @@ -44,6 +46,16 @@ public class ResourceUtilizationManager { throw new IllegalArgumentException("Table name found to be null or empty while checking resource utilization."); } LOGGER.info("Checking resource utilization for table: {}", tableNameWithType); - return _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableNameWithType); + boolean overallIsResourceUtilizationWithinLimits = true; + for (UtilizationChecker utilizationChecker : _utilizationCheckers) { + boolean isResourceUtilizationWithinLimits = + utilizationChecker.isResourceUtilizationWithinLimits(tableNameWithType, purpose); + LOGGER.info("For utilization checker: {}, isResourceUtilizationWithinLimits: {}, purpose: {}", + utilizationChecker.getName(), isResourceUtilizationWithinLimits, purpose); + if (!isResourceUtilizationWithinLimits) { + overallIsResourceUtilizationWithinLimits = false; + } + } + return overallIsResourceUtilizationWithinLimits; } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java new file mode 100644 index 00000000000..2a4cdc9ce19 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import com.google.common.collect.BiMap; +import org.apache.pinot.controller.util.CompletionServiceHelper; + + +/** + * Interface for all utilization checkers to be used to by ResourceUtilizationManager and ResourceUtilizationChecker + */ +public interface UtilizationChecker { + /** + * Get the name of the utilization checker + */ + String getName(); + + /** + * Returns true if the resource's utilization is within limits + * @param tableNameWithType table name with type + * @param purpose purpose of this check + */ + boolean isResourceUtilizationWithinLimits(String tableNameWithType, CheckPurpose purpose); + + /** + * Computes the resource's utilization + * @param endpointsToInstances map of endpoints to instances + * @param completionServiceHelper the completion service helper + */ + void computeResourceUtilization(BiMap<String, String> endpointsToInstances, + CompletionServiceHelper completionServiceHelper); + + /** + * Passed to 'isResourceUtilizationWithinLimits' so that each 'UtilizationChecker' can decide if any special handling + * is required depending on the origin of the check + */ + enum CheckPurpose { + // REALTIME_INGESTION if the check is performed from the realtime ingestion code path to pause ingestion + // TASK_GENERATION if the check is performed from the task generation framework to pause creation of new tasks + REALTIME_INGESTION, TASK_GENERATION + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java index ea0b5aa2ba0..3e29c0c57f7 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java @@ -44,28 +44,35 @@ public class DiskUtilizationCheckerTest { private static final String DISK_UTILIZATION_PATH = "/disk/utilization/path"; private PinotHelixResourceManager _helixResourceManager; - private ControllerConf _controllerConf; private DiskUtilizationChecker _diskUtilizationChecker; @BeforeMethod public void setUp() { _helixResourceManager = mock(PinotHelixResourceManager.class); - _controllerConf = mock(ControllerConf.class); + ControllerConf controllerConf = mock(ControllerConf.class); - when(_controllerConf.getDiskUtilizationPath()).thenReturn(DISK_UTILIZATION_PATH); - when(_controllerConf.getDiskUtilizationThreshold()).thenReturn(0.8); - when(_controllerConf.getDiskUtilizationCheckTimeoutMs()).thenReturn(5000); - when(_controllerConf.getResourceUtilizationCheckerFrequency()).thenReturn(120L); + when(controllerConf.getDiskUtilizationPath()).thenReturn(DISK_UTILIZATION_PATH); + when(controllerConf.getDiskUtilizationThreshold()).thenReturn(0.8); + when(controllerConf.getDiskUtilizationCheckTimeoutMs()).thenReturn(5000); + when(controllerConf.getResourceUtilizationCheckerFrequency()).thenReturn(120L); - _diskUtilizationChecker = new DiskUtilizationChecker(_helixResourceManager, _controllerConf); + _diskUtilizationChecker = new DiskUtilizationChecker(_helixResourceManager, controllerConf); } @Test public void testIsDiskUtilizationWithinLimitsNullOrEmptyTableName() { Assert.assertThrows(IllegalArgumentException.class, - () -> _diskUtilizationChecker.isDiskUtilizationWithinLimits(null)); + () -> _diskUtilizationChecker.isResourceUtilizationWithinLimits(null, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION)); Assert.assertThrows(IllegalArgumentException.class, - () -> _diskUtilizationChecker.isDiskUtilizationWithinLimits("")); + () -> _diskUtilizationChecker.isResourceUtilizationWithinLimits("", + UtilizationChecker.CheckPurpose.REALTIME_INGESTION)); + Assert.assertThrows(IllegalArgumentException.class, + () -> _diskUtilizationChecker.isResourceUtilizationWithinLimits(null, + UtilizationChecker.CheckPurpose.TASK_GENERATION)); + Assert.assertThrows(IllegalArgumentException.class, + () -> _diskUtilizationChecker.isResourceUtilizationWithinLimits("", + UtilizationChecker.CheckPurpose.TASK_GENERATION)); } @Test @@ -73,16 +80,26 @@ public class DiskUtilizationCheckerTest { String tableName = "test_OFFLINE"; when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(null); - boolean result = _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName); + boolean result = _diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); + Assert.assertTrue(result); + + result = _diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.TASK_GENERATION); Assert.assertTrue(result); } @Test public void testIsDiskUtilizationWithinLimitsNonExistentRealtimeTable() { String tableName = "test_REALTIME"; - when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(null); + when(_helixResourceManager.getRealtimeTableConfig(tableName)).thenReturn(null); - boolean result = _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName); + boolean result = _diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); + Assert.assertTrue(result); + + result = _diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.TASK_GENERATION); Assert.assertTrue(result); } @@ -107,7 +124,12 @@ public class DiskUtilizationCheckerTest { diskUsageInfoMap.put("server2", diskUsageInfo2); ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap); - boolean result = _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName); + boolean result = _diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); + Assert.assertTrue(result); + + result = _diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.TASK_GENERATION); Assert.assertTrue(result); } @@ -132,7 +154,12 @@ public class DiskUtilizationCheckerTest { diskUsageInfoMap.put("server2", diskUsageInfo2); ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap); - boolean result = _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName); + boolean result = _diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); + Assert.assertFalse(result); + + result = _diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.TASK_GENERATION); Assert.assertFalse(result); } @@ -162,7 +189,7 @@ public class DiskUtilizationCheckerTest { when(completionServiceHelper.doMultiGetRequest(anyList(), anyString(), anyBoolean(), anyMap(), anyInt(), anyString())).thenReturn(serviceResponse); - _diskUtilizationChecker.computeDiskUtilization(instanceAdminEndpoints.inverse(), completionServiceHelper); + _diskUtilizationChecker.computeResourceUtilization(instanceAdminEndpoints.inverse(), completionServiceHelper); DiskUsageInfo diskUsageInfo1 = ResourceUtilizationInfo.getDiskUsageInfo("server1"); DiskUsageInfo diskUsageInfo2 = ResourceUtilizationInfo.getDiskUsageInfo("server2"); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java index 7ca553f2b36..9c435a48e88 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java @@ -104,8 +104,8 @@ public class RealtimeSegmentValidationManagerTest { when(pauseStatus.getPauseFlag()).thenReturn(isTablePaused); when(pauseStatus.getReasonCode()).thenReturn(reasonCode); when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName)).thenReturn(pauseStatus); - when(_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName)).thenReturn( - !isResourceUtilizationExceeded); + when(_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION)).thenReturn(!isResourceUtilizationExceeded); when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); when(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig)).thenReturn(isQuotaExceeded); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java index bb61d6e0834..9487105d530 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.validation; +import java.util.List; import org.apache.pinot.controller.ControllerConf; import org.mockito.Mockito; import org.testng.Assert; @@ -28,6 +29,7 @@ import org.testng.annotations.Test; public class ResourceUtilizationManagerTest { private DiskUtilizationChecker _diskUtilizationChecker; + private List<UtilizationChecker> _utilizationCheckers; private ControllerConf _controllerConf; private ResourceUtilizationManager _resourceUtilizationManager; private final String _testTable = "myTable_OFFLINE"; @@ -35,51 +37,93 @@ public class ResourceUtilizationManagerTest { @BeforeMethod public void setUp() { _diskUtilizationChecker = Mockito.mock(DiskUtilizationChecker.class); + _utilizationCheckers = List.of(_diskUtilizationChecker); _controllerConf = Mockito.mock(ControllerConf.class); } @Test public void testIsResourceUtilizationWithinLimitsWhenCheckIsDisabled() { Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(false); - _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _utilizationCheckers); - boolean result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable); + boolean result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); + Assert.assertTrue(result, "Resource utilization should be within limits when the check is disabled"); + + result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable, + UtilizationChecker.CheckPurpose.TASK_GENERATION); Assert.assertTrue(result, "Resource utilization should be within limits when the check is disabled"); } @Test(expectedExceptions = IllegalArgumentException.class) public void testIsResourceUtilizationWithinLimitsWithNullTableName() { Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); - _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _utilizationCheckers); + + _resourceUtilizationManager.isResourceUtilizationWithinLimits(null, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testIsResourceUtilizationWithinLimitsWithNullTableNameIsForMinionTrue() { + Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _utilizationCheckers); - _resourceUtilizationManager.isResourceUtilizationWithinLimits(null); + _resourceUtilizationManager.isResourceUtilizationWithinLimits(null, + UtilizationChecker.CheckPurpose.TASK_GENERATION); } @Test(expectedExceptions = IllegalArgumentException.class) public void testIsResourceUtilizationWithinLimitsWithEmptyTableName() { Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); - _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _utilizationCheckers); - _resourceUtilizationManager.isResourceUtilizationWithinLimits(""); + _resourceUtilizationManager.isResourceUtilizationWithinLimits("", + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testIsResourceUtilizationWithinLimitsWithEmptyTableNameIsForMinionTrue() { + Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _utilizationCheckers); + + _resourceUtilizationManager.isResourceUtilizationWithinLimits("", + UtilizationChecker.CheckPurpose.TASK_GENERATION); } @Test public void testIsResourceUtilizationWithinLimitsWhenCheckIsEnabled() { Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); - Mockito.when(_diskUtilizationChecker.isDiskUtilizationWithinLimits(_testTable)).thenReturn(true); - _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + Mockito.when(_diskUtilizationChecker.isResourceUtilizationWithinLimits(_testTable, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION)).thenReturn(true); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _utilizationCheckers); - boolean result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable); - Assert.assertTrue(result, "Resource utilization should be within limits when disk check returns true"); + boolean result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); + Assert.assertTrue(result, "Resource utilization should be within limits when disk check and primary key count " + + "check returns true"); + + Mockito.when(_diskUtilizationChecker.isResourceUtilizationWithinLimits(_testTable, + UtilizationChecker.CheckPurpose.TASK_GENERATION)).thenReturn(true); + result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable, + UtilizationChecker.CheckPurpose.TASK_GENERATION); + Assert.assertTrue(result, "Resource utilization should be within limits when disk check and primary key count " + + "check returns true"); } @Test public void testIsResourceUtilizationWithinLimitsWhenCheckFails() { Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); - Mockito.when(_diskUtilizationChecker.isDiskUtilizationWithinLimits(_testTable)).thenReturn(false); - _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + Mockito.when(_diskUtilizationChecker.isResourceUtilizationWithinLimits(_testTable, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION)).thenReturn(false); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _utilizationCheckers); + + boolean result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable, + UtilizationChecker.CheckPurpose.REALTIME_INGESTION); + Assert.assertFalse(result, "Resource utilization should not be within limits when disk check returns false"); - boolean result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable); + result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable, + UtilizationChecker.CheckPurpose.TASK_GENERATION); Assert.assertFalse(result, "Resource utilization should not be within limits when disk check returns false"); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index ab36b37eaaf..e25f8da05e2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -879,6 +879,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager { return _tableUpsertMetadataManager; } + @VisibleForTesting + public TableDedupMetadataManager getTableDedupMetadataManager() { + return _tableDedupMetadataManager; + } + /** * Retrieves a mapping of partition id to the primary key count for the partition. * diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java index 243509e3473..b8fa0d6a649 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java @@ -410,8 +410,6 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu _logger.info("Closed the metadata manager"); } - protected abstract long getNumPrimaryKeys(); - protected void updatePrimaryKeyGauge(long numPrimaryKeys) { _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, numPrimaryKeys); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java index 61988fd4ad1..b0ca500c7bd 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java @@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.dedup; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -123,6 +124,15 @@ public abstract class BaseTableDedupMetadataManager implements TableDedupMetadat */ protected abstract PartitionDedupMetadataManager createPartitionDedupMetadataManager(Integer partitionId); + @Override + public Map<Integer, Long> getPartitionToPrimaryKeyCount() { + Map<Integer, Long> partitionToPrimaryKeyCount = new HashMap<>(); + _partitionMetadataManagerMap.forEach( + (partitionID, dedupMetadataManager) -> partitionToPrimaryKeyCount.put(partitionID, + dedupMetadataManager.getNumPrimaryKeys())); + return partitionToPrimaryKeyCount; + } + @Override public DedupContext getContext() { return _context; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java index b4ef9ca63ac..5a1c0440bd8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java @@ -141,7 +141,7 @@ class ConcurrentMapPartitionDedupMetadataManager extends BasePartitionDedupMetad } @Override - protected long getNumPrimaryKeys() { + public long getNumPrimaryKeys() { return _primaryKeyToSegmentAndTimeMap.size(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java index 8ccd7a51837..f3dece6b6c7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java @@ -93,4 +93,6 @@ public interface PartitionDedupMetadataManager extends Closeable { * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted. */ void stop(); + + long getNumPrimaryKeys(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java index 03007c90349..a2d3065d565 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.dedup; import java.io.Closeable; +import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; @@ -49,4 +50,11 @@ public interface TableDedupMetadataManager extends Closeable { * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted. */ void stop(); + + /** + * Retrieves a mapping of partition id to the primary key count for the partition. + * + * @return A {@code Map} where keys are partition id and values are count of primary keys for that specific partition + */ + Map<Integer, Long> getPartitionToPrimaryKeyCount(); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java index 475d4a39297..e02da4408e0 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java @@ -79,7 +79,7 @@ public class BasePartitionDedupMetadataManagerTest { } @Override - protected long getNumPrimaryKeys() { + public long getNumPrimaryKeys() { return 0; } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java index a285d074a5b..bfe42c47955 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java @@ -40,13 +40,16 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.helix.HelixManager; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.restlet.resources.DiskUsageInfo; +import org.apache.pinot.common.restlet.resources.PrimaryKeyCountInfo; import org.apache.pinot.common.restlet.resources.ResourceUtils; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.server.api.AdminApiApplication; import org.apache.pinot.server.starter.ServerInstance; @@ -126,4 +129,23 @@ public class InstanceResource { DiskUsageInfo diskUsageInfo = DiskUtilization.computeDiskUsage(_instanceId, pathStr); return ResourceUtils.convertToJsonString(diskUsageInfo); } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/primaryKeyCount") + @ApiOperation(value = "Show number of primary keys", notes = "Total number of upsert / dedup primary keys") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error") + }) + public String getPrimaryKeyCountInfo(@Context HttpHeaders headers) + throws WebApplicationException { + // Use InstanceDataManager to fetch details about the number of primary keys across all upsert / dedup tables + InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); + if (instanceDataManager == null) { + throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR); + } + PrimaryKeyCountInfo primaryKeyCountInfo = + PrimaryKeyCount.computeNumberOfPrimaryKeys(_instanceId, instanceDataManager); + return ResourceUtils.convertToJsonString(primaryKeyCountInfo); + } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java new file mode 100644 index 00000000000..39cbfa318e8 --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import javax.validation.constraints.NotNull; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.restlet.resources.PrimaryKeyCountInfo; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PrimaryKeyCount { + private static final Logger LOGGER = LoggerFactory.getLogger(PrimaryKeyCount.class); + + private PrimaryKeyCount() { + } + + /** + * Computes the number of primary keys for this instance + */ + public static PrimaryKeyCountInfo computeNumberOfPrimaryKeys(String instanceId, + InstanceDataManager instanceDataManager) { + if (StringUtils.isEmpty(instanceId)) { + throw new IllegalArgumentException("InstanceID cannot be null or empty while computing the number of upsert / " + + "dedup primary keys."); + } + + if (instanceDataManager == null) { + throw new IllegalArgumentException("instanceDataManager cannot be null while computing the number of upsert / " + + "dedup primary keys."); + } + + Set<String> allTables = instanceDataManager.getAllTables(); + long totalPrimaryKeyCount = 0L; + Set<String> tablesWithPrimaryKeys = new HashSet<>(); + for (String tableNameWithType : allTables) { + TableDataManager tableDataManager = instanceDataManager.getTableDataManager(tableNameWithType); + if (tableDataManager == null) { + LOGGER.warn("TableDataManager for table: {} is null, skipping", tableNameWithType); + continue; + } + if (tableDataManager instanceof RealtimeTableDataManager) { + Map<Integer, Long> partitionToPrimaryKeyCount = + getPartitionToPrimaryKeyCount((RealtimeTableDataManager) tableDataManager); + + if (!partitionToPrimaryKeyCount.isEmpty()) { + tablesWithPrimaryKeys.add(tableNameWithType); + } + + for (Long numPrimaryKeys : partitionToPrimaryKeyCount.values()) { + totalPrimaryKeyCount += numPrimaryKeys == null ? 0 : numPrimaryKeys; + } + } + } + + return new PrimaryKeyCountInfo(instanceId, totalPrimaryKeyCount, tablesWithPrimaryKeys, System.currentTimeMillis()); + } + + private static Map<Integer, Long> getPartitionToPrimaryKeyCount(@NotNull RealtimeTableDataManager tableDataManager) { + // Fetch the primary key count per partition if either upsert or dedup is enabled + if (tableDataManager.isUpsertEnabled()) { + return tableDataManager.getTableUpsertMetadataManager().getPartitionToPrimaryKeyCount(); + } else if (tableDataManager.isDedupEnabled()) { + return tableDataManager.getTableDedupMetadataManager().getPartitionToPrimaryKeyCount(); + } + return Map.of(); + } +} diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java new file mode 100644 index 00000000000..14ba30e6188 --- /dev/null +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.common.restlet.resources.PrimaryKeyCountInfo; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; +import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager; +import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager; +import org.apache.pinot.server.api.resources.PrimaryKeyCount; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class PrimaryKeyCountTest { + + @Test + public void testComputePrimaryKeysForEmptyOrNullInstanceId() { + // Mock the instance data manager + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + + Assert.assertThrows(IllegalArgumentException.class, + () -> PrimaryKeyCount.computeNumberOfPrimaryKeys("", instanceDataManager)); + + Assert.assertThrows(IllegalArgumentException.class, + () -> PrimaryKeyCount.computeNumberOfPrimaryKeys(null, instanceDataManager)); + } + + @Test + public void testComputePrimaryKeysForNullInstanceDataManager() { + Assert.assertThrows(IllegalArgumentException.class, + () -> PrimaryKeyCount.computeNumberOfPrimaryKeys("instanceId", null)); + } + + + @Test + public void testComputePrimaryKeysNullTableDataManager() { + Set<String> allTables = new HashSet<>(); + allTables.add("myTable_REALTIME"); + allTables.add("myTable_OFFLINE"); + + // Mock the instance data manager + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + when(instanceDataManager.getTableDataManager("myTable_REALTIME")).thenReturn(null); + when(instanceDataManager.getTableDataManager("myTable_OFFLINE")).thenReturn(null); + when(instanceDataManager.getAllTables()).thenReturn(allTables); + + String instanceId = "instance42"; + PrimaryKeyCountInfo primaryKeyCountInfo = PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId, + instanceDataManager); + + Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId); + Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), 0); + Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables()); + Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 0); + Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= System.currentTimeMillis()); + } + + @Test + public void testComputePrimaryKeysNonUpsertDedupTables() { + Set<String> allTables = new HashSet<>(); + allTables.add("myTable_REALTIME"); + allTables.add("myTable_OFFLINE"); + + // Mock the Table data manager + RealtimeTableDataManager realtimeTableDataManager = mock(RealtimeTableDataManager.class); + when(realtimeTableDataManager.isUpsertEnabled()).thenReturn(false); + when(realtimeTableDataManager.isDedupEnabled()).thenReturn(false); + OfflineTableDataManager offlineTableDataManager = mock(OfflineTableDataManager.class); + when(realtimeTableDataManager.getTableUpsertMetadataManager()).thenReturn(null); + when(realtimeTableDataManager.getTableDedupMetadataManager()).thenReturn(null); + + // Mock the instance data manager + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + when(instanceDataManager.getTableDataManager("myTable_REALTIME")) + .thenReturn(realtimeTableDataManager); + when(instanceDataManager.getTableDataManager("myTable_OFFLINE")) + .thenReturn(offlineTableDataManager); + when(instanceDataManager.getAllTables()).thenReturn(allTables); + + String instanceId = "instance42"; + PrimaryKeyCountInfo primaryKeyCountInfo = PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId, + instanceDataManager); + + Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId); + Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), 0); + Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables()); + Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 0); + Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= System.currentTimeMillis()); + } + + @Test + public void testComputePrimaryKeysUpsertOnly() { + Set<String> allTables = new HashSet<>(); + allTables.add("myTable_REALTIME"); + allTables.add("myTable_OFFLINE"); + + Map<Integer, Long> partitionToPrimaryKeyCountMap = new HashMap<>(); + partitionToPrimaryKeyCountMap.put(0, 42L); + partitionToPrimaryKeyCountMap.put(1, 420L); + partitionToPrimaryKeyCountMap.put(2, 4200L); + + long primaryKeyCount = 0; + for (Long count : partitionToPrimaryKeyCountMap.values()) { + primaryKeyCount += count; + } + + // Mock the Table data manager + RealtimeTableDataManager realtimeTableDataManager = mock(RealtimeTableDataManager.class); + when(realtimeTableDataManager.isUpsertEnabled()).thenReturn(true); + when(realtimeTableDataManager.isDedupEnabled()).thenReturn(false); + OfflineTableDataManager offlineTableDataManager = mock(OfflineTableDataManager.class); + + // Mock the upsert manager + TableUpsertMetadataManager tableUpsertMetadataManager = mock(TableUpsertMetadataManager.class); + when(tableUpsertMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(partitionToPrimaryKeyCountMap); + when(realtimeTableDataManager.getTableUpsertMetadataManager()).thenReturn(tableUpsertMetadataManager); + + // Mock the instance data manager + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + when(instanceDataManager.getTableDataManager("myTable_REALTIME")) + .thenReturn(realtimeTableDataManager); + when(instanceDataManager.getTableDataManager("myTable_OFFLINE")) + .thenReturn(offlineTableDataManager); + when(instanceDataManager.getAllTables()).thenReturn(allTables); + + String instanceId = "instance42"; + PrimaryKeyCountInfo primaryKeyCountInfo = PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId, + instanceDataManager); + + Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId); + Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), primaryKeyCount); + Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables()); + Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 1); + Assert.assertTrue(primaryKeyCountInfo.getUpsertAndDedupTables().contains("myTable_REALTIME")); + Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= System.currentTimeMillis()); + } + + @Test + public void testComputePrimaryKeysDedupOnly() { + Set<String> allTables = new HashSet<>(); + allTables.add("myTable_REALTIME"); + allTables.add("myTable_OFFLINE"); + + Map<Integer, Long> partitionToPrimaryKeyCountMap = new HashMap<>(); + partitionToPrimaryKeyCountMap.put(0, 42L); + partitionToPrimaryKeyCountMap.put(1, 420L); + partitionToPrimaryKeyCountMap.put(2, 4200L); + + long primaryKeyCount = 0; + for (Long count : partitionToPrimaryKeyCountMap.values()) { + primaryKeyCount += count; + } + + // Mock the Table data manager + RealtimeTableDataManager realtimeTableDataManager = mock(RealtimeTableDataManager.class); + when(realtimeTableDataManager.isUpsertEnabled()).thenReturn(false); + when(realtimeTableDataManager.isDedupEnabled()).thenReturn(true); + OfflineTableDataManager offlineTableDataManager = mock(OfflineTableDataManager.class); + + // Mock the dedup manager + TableDedupMetadataManager tableDedupMetadataManager = mock(TableDedupMetadataManager.class); + when(tableDedupMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(partitionToPrimaryKeyCountMap); + when(realtimeTableDataManager.getTableDedupMetadataManager()).thenReturn(tableDedupMetadataManager); + + // Mock the instance data manager + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + when(instanceDataManager.getTableDataManager("myTable_REALTIME")) + .thenReturn(realtimeTableDataManager); + when(instanceDataManager.getTableDataManager("myTable_OFFLINE")) + .thenReturn(offlineTableDataManager); + when(instanceDataManager.getAllTables()).thenReturn(allTables); + + String instanceId = "instance42"; + PrimaryKeyCountInfo primaryKeyCountInfo = PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId, + instanceDataManager); + + Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId); + Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), primaryKeyCount); + Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables()); + Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 1); + Assert.assertTrue(primaryKeyCountInfo.getUpsertAndDedupTables().contains("myTable_REALTIME")); + Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= System.currentTimeMillis()); + } + + @Test + public void testComputePrimaryKeysUpsertAndDedup() { + Set<String> allTables = new HashSet<>(); + allTables.add("myTableUpsert_REALTIME"); + allTables.add("myTableDedup_REALTIME"); + allTables.add("myTable_OFFLINE"); + + Map<Integer, Long> upsertPartitionToPrimaryKeyCountMap = new HashMap<>(); + upsertPartitionToPrimaryKeyCountMap.put(0, 42L); + upsertPartitionToPrimaryKeyCountMap.put(1, 420L); + upsertPartitionToPrimaryKeyCountMap.put(2, 4200L); + + Map<Integer, Long> dedupPartitionToPrimaryKeyCountMap = new HashMap<>(); + dedupPartitionToPrimaryKeyCountMap.put(0, 2042L); + dedupPartitionToPrimaryKeyCountMap.put(1, 777L); + + long primaryKeyCount = 0; + for (Long count : upsertPartitionToPrimaryKeyCountMap.values()) { + primaryKeyCount += count; + } + for (Long count : dedupPartitionToPrimaryKeyCountMap.values()) { + primaryKeyCount += count; + } + + // Mock the Table data manager + RealtimeTableDataManager upsertRealtimeTableDataManager = mock(RealtimeTableDataManager.class); + when(upsertRealtimeTableDataManager.isUpsertEnabled()).thenReturn(true); + when(upsertRealtimeTableDataManager.isDedupEnabled()).thenReturn(false); + RealtimeTableDataManager dedupRealtimeTableDataManager = mock(RealtimeTableDataManager.class); + when(dedupRealtimeTableDataManager.isUpsertEnabled()).thenReturn(false); + when(dedupRealtimeTableDataManager.isDedupEnabled()).thenReturn(true); + OfflineTableDataManager offlineTableDataManager = mock(OfflineTableDataManager.class); + + // Mock the upsert manager + TableUpsertMetadataManager tableUpsertMetadataManager = mock(TableUpsertMetadataManager.class); + when(tableUpsertMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(upsertPartitionToPrimaryKeyCountMap); + when(upsertRealtimeTableDataManager.getTableUpsertMetadataManager()).thenReturn(tableUpsertMetadataManager); + + // Mock the dedup manager + TableDedupMetadataManager tableDedupMetadataManager = mock(TableDedupMetadataManager.class); + when(tableDedupMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(dedupPartitionToPrimaryKeyCountMap); + when(dedupRealtimeTableDataManager.getTableDedupMetadataManager()).thenReturn(tableDedupMetadataManager); + + // Mock the instance data manager + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + when(instanceDataManager.getTableDataManager("myTableUpsert_REALTIME")) + .thenReturn(upsertRealtimeTableDataManager); + when(instanceDataManager.getTableDataManager("myTableDedup_REALTIME")) + .thenReturn(dedupRealtimeTableDataManager); + when(instanceDataManager.getTableDataManager("myTable_OFFLINE")) + .thenReturn(offlineTableDataManager); + when(instanceDataManager.getAllTables()).thenReturn(allTables); + + String instanceId = "instance42"; + PrimaryKeyCountInfo primaryKeyCountInfo = PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId, + instanceDataManager); + + Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId); + Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), primaryKeyCount); + Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables()); + Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 2); + Assert.assertTrue(primaryKeyCountInfo.getUpsertAndDedupTables().contains("myTableUpsert_REALTIME")); + Assert.assertTrue(primaryKeyCountInfo.getUpsertAndDedupTables().contains("myTableDedup_REALTIME")); + Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= System.currentTimeMillis()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org