This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7239d34117 Added support to pause and resume ingestion based on resource utilization (#15008) 7239d34117 is described below commit 7239d3411719f89370de8645979061647a843354 Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com> AuthorDate: Sun Feb 16 21:04:34 2025 -0800 Added support to pause and resume ingestion based on resource utilization (#15008) --- .../pinot/common/metrics/ControllerGauge.java | 5 +- .../common/restlet/resources/DiskUsageInfo.java | 74 +++++++++ .../pinot/controller/BaseControllerStarter.java | 19 ++- .../apache/pinot/controller/ControllerConf.java | 38 +++++ .../helix/core/minion/PinotTaskManager.java | 25 ++- .../validation/DiskUtilizationChecker.java | 136 ++++++++++++++++ .../RealtimeSegmentValidationManager.java | 38 ++++- .../validation/ResourceUtilizationChecker.java | 86 ++++++++++ .../validation/ResourceUtilizationInfo.java | 45 ++++++ .../validation/ResourceUtilizationManager.java | 49 ++++++ ...ControllerPeriodicTaskStarterStatelessTest.java | 2 +- .../validation/DiskUtilizationCheckerTest.java | 178 +++++++++++++++++++++ .../RealtimeSegmentValidationManagerTest.java | 116 ++++++++++++++ .../validation/ResourceUtilizationInfoTest.java | 64 ++++++++ .../validation/ResourceUtilizationManagerTest.java | 85 ++++++++++ .../server/api/resources/DiskUtilization.java | 66 ++++++++ .../server/api/resources/InstanceResource.java | 24 +++ .../pinot/server/api/DiskUtilizationTest.java | 52 ++++++ .../apache/pinot/spi/config/table/PauseState.java | 2 +- 19 files changed, 1094 insertions(+), 10 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 66bb2be961..40e2d09b4f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -187,7 +187,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false), // Number of reingested segments getting uploaded - REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress", true); + REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress", true), + + // Resource utilization is within limits or not for a table + RESOURCE_UTILIZATION_LIMIT_EXCEEDED("ResourceUtilizationLimitExceeded", false); private final String _gaugeName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/DiskUsageInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/DiskUsageInfo.java new file mode 100644 index 0000000000..7f5d6ed138 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/DiskUsageInfo.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.restlet.resources; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + + +/** + * A simple container class to hold disk usage information. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class DiskUsageInfo { + private final String _instanceId; + private final String _path; + private final long _totalSpaceBytes; + private final long _usedSpaceBytes; + private final long _lastUpdatedTimeInEpochMs; + + @JsonCreator + public DiskUsageInfo(@JsonProperty("instanceId") String instanceId, @JsonProperty("path") String path, + @JsonProperty("totalSpaceBytes") long totalSpaceBytes, + @JsonProperty("usedSpaceBytes") long usedSpaceBytes, + @JsonProperty("lastUpdatedTimeInEpochMs") long lastUpdatedTimeInEpochMs) { + _instanceId = instanceId; + _path = path; + _totalSpaceBytes = totalSpaceBytes; + _usedSpaceBytes = usedSpaceBytes; + _lastUpdatedTimeInEpochMs = lastUpdatedTimeInEpochMs; + } + + public String getInstanceId() { + return _instanceId; + } + + public String getPath() { + return _path; + } + + public long getTotalSpaceBytes() { + return _totalSpaceBytes; + } + + public long getUsedSpaceBytes() { + return _usedSpaceBytes; + } + + public long getLastUpdatedTimeInEpochMs() { + return _lastUpdatedTimeInEpochMs; + } + + public String toString() { + return "DiskUsageInfo{" + "_instanceId='" + _instanceId + '\'' + ", _path='" + _path + '\'' + ", _totalSpaceBytes=" + + _totalSpaceBytes + ", _usedSpaceBytes=" + _usedSpaceBytes + ", _lastUpdatedTimeInEpochMs=" + + _lastUpdatedTimeInEpochMs + '}'; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index e95e0f5d87..5761cb27e5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -113,8 +113,11 @@ import org.apache.pinot.controller.helix.starter.HelixConfig; import org.apache.pinot.controller.tuner.TableConfigTunerRegistry; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.controller.validation.BrokerResourceValidationManager; +import org.apache.pinot.controller.validation.DiskUtilizationChecker; import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker; import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; +import org.apache.pinot.controller.validation.ResourceUtilizationChecker; +import org.apache.pinot.controller.validation.ResourceUtilizationManager; import org.apache.pinot.controller.validation.StorageQuotaChecker; import org.apache.pinot.core.periodictask.PeriodicTask; import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; @@ -204,6 +207,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected ExecutorService _tenantRebalanceExecutorService; protected TableSizeReader _tableSizeReader; protected StorageQuotaChecker _storageQuotaChecker; + protected DiskUtilizationChecker _diskUtilizationChecker; + protected ResourceUtilizationManager _resourceUtilizationManager; @Override public void init(PinotConfiguration pinotConfiguration) @@ -510,6 +515,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager, _helixResourceManager, _config); + _diskUtilizationChecker = new DiskUtilizationChecker(_helixResourceManager, _config); + _resourceUtilizationManager = new ResourceUtilizationManager(_config, _diskUtilizationChecker); + // Setting up periodic tasks List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks(); LOGGER.info("Init controller periodic tasks scheduler"); @@ -561,6 +569,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { bind(_tenantRebalancer).to(TenantRebalancer.class); bind(_tableSizeReader).to(TableSizeReader.class); bind(_storageQuotaChecker).to(StorageQuotaChecker.class); + bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class); + bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class); bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME); String loggerRootDir = _config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR); if (loggerRootDir != null) { @@ -862,7 +872,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { _taskManagerStatusCache = getTaskManagerStatusCache(); _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config, - _controllerMetrics, _taskManagerStatusCache, _executorService, _connectionManager); + _controllerMetrics, _taskManagerStatusCache, _executorService, _connectionManager, + _resourceUtilizationManager); periodicTasks.add(_taskManager); _retentionManager = new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); @@ -873,7 +884,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { periodicTasks.add(_offlineSegmentIntervalChecker); _realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager, - _pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics, _storageQuotaChecker); + _pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics, _storageQuotaChecker, + _resourceUtilizationManager); periodicTasks.add(_realtimeSegmentValidationManager); _brokerResourceValidationManager = new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics); @@ -902,6 +914,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { PeriodicTask responseStoreCleaner = new ResponseStoreCleaner(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics, _executorService, _connectionManager); periodicTasks.add(responseStoreCleaner); + PeriodicTask resourceUtilizationChecker = new ResourceUtilizationChecker(_config, _connectionManager, + _controllerMetrics, _diskUtilizationChecker, _executorService, _helixResourceManager); + periodicTasks.add(resourceUtilizationChecker); return periodicTasks; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 5ea1fd20cf..d3396cd3a7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -300,6 +300,14 @@ public class ControllerConf extends PinotConfiguration { private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = "controller.realtime.segment.metadata.commit.numLocks"; private static final String ENABLE_STORAGE_QUOTA_CHECK = "controller.enable.storage.quota.check"; + private static final String DISK_UTILIZATION_THRESHOLD = "controller.disk.utilization.threshold"; // 0 < threshold < 1 + private static final String DISK_UTILIZATION_CHECK_TIMEOUT_MS = "controller.disk.utilization.check.timeoutMs"; + private static final String DISK_UTILIZATION_PATH = "controller.disk.utilization.path"; + private static final String ENABLE_RESOURCE_UTILIZATION_CHECK = "controller.enable.resource.utilization.check"; + private static final String RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY = + "controller.resource.utilization.checker.initial.delay"; + private static final String RESOURCE_UTILIZATION_CHECKER_FREQUENCY = + "controller.resource.utilization.checker.frequency"; private static final String ENABLE_BATCH_MESSAGE_MODE = "controller.enable.batch.message.mode"; public static final String DIM_TABLE_MAX_SIZE = "controller.dimTable.maxSize"; @@ -323,6 +331,12 @@ public class ControllerConf extends PinotConfiguration { private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION = -1; private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64; private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true; + private static final double DEFAULT_DISK_UTILIZATION_THRESHOLD = 0.95; + private static final int DEFAULT_DISK_UTILIZATION_CHECK_TIMEOUT_MS = 30_000; + private static final String DEFAULT_DISK_UTILIZATION_PATH = "/home/pinot/data"; + private static final boolean DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK = false; + private static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY = 300L; // 5 minutes + private static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_FREQUENCY = 300L; // 5 minutes private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false; // Disallow any high level consumer (HLC) table private static final boolean DEFAULT_ALLOW_HLC_TABLES = false; @@ -987,6 +1001,30 @@ public class ControllerConf extends PinotConfiguration { return getProperty(ENABLE_STORAGE_QUOTA_CHECK, DEFAULT_ENABLE_STORAGE_QUOTA_CHECK); } + public String getDiskUtilizationPath() { + return getProperty(DISK_UTILIZATION_PATH, DEFAULT_DISK_UTILIZATION_PATH); + } + + public double getDiskUtilizationThreshold() { + return getProperty(DISK_UTILIZATION_THRESHOLD, DEFAULT_DISK_UTILIZATION_THRESHOLD); + } + + public int getDiskUtilizationCheckTimeoutMs() { + return getProperty(DISK_UTILIZATION_CHECK_TIMEOUT_MS, DEFAULT_DISK_UTILIZATION_CHECK_TIMEOUT_MS); + } + + public long getResourceUtilizationCheckerInitialDelay() { + return getProperty(RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY, DEFAULT_RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY); + } + + public long getResourceUtilizationCheckerFrequency() { + return getProperty(RESOURCE_UTILIZATION_CHECKER_FREQUENCY, DEFAULT_RESOURCE_UTILIZATION_CHECKER_FREQUENCY); + } + + public boolean isResourceUtilizationCheckEnabled() { + return getProperty(ENABLE_RESOURCE_UTILIZATION_CHECK, DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK); + } + public boolean getEnableBatchMessageMode() { return getProperty(ENABLE_BATCH_MESSAGE_MODE, DEFAULT_ENABLE_BATCH_MESSAGE_MODE); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 9f1f938187..c62d7a1035 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -55,6 +55,7 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; +import org.apache.pinot.controller.validation.ResourceUtilizationManager; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; @@ -100,6 +101,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { private final PinotHelixTaskResourceManager _helixTaskResourceManager; private final ClusterInfoAccessor _clusterInfoAccessor; private final TaskGeneratorRegistry _taskGeneratorRegistry; + private final ResourceUtilizationManager _resourceUtilizationManager; // For cron-based scheduling private final Scheduler _scheduler; @@ -120,11 +122,12 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics, TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache, Executor executor, - PoolingHttpClientConnectionManager connectionManager) { + PoolingHttpClientConnectionManager connectionManager, ResourceUtilizationManager resourceUtilizationManager) { super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(), controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager, controllerMetrics); _helixTaskResourceManager = helixTaskResourceManager; + _resourceUtilizationManager = resourceUtilizationManager; _taskManagerStatusCache = taskManagerStatusCache; _clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics, @@ -205,6 +208,16 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { for (String tableNameWithType : tableNameWithTypes) { TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); LOGGER.info("Trying to create tasks of type: {}, table: {}", taskType, tableNameWithType); + try { + if (!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType)) { + LOGGER.warn("Resource utilization is above threshold, skipping task creation for table: {}", tableName); + _controllerMetrics.setOrUpdateTableGauge(tableName, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L); + continue; + } + _controllerMetrics.setOrUpdateTableGauge(tableName, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 0L); + } catch (Exception e) { + LOGGER.warn("Caught exception while checking resource utilization for table: {}", tableName, e); + } List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(tableConfig, taskConfigs); if (pinotTaskConfigs.isEmpty()) { LOGGER.warn("No ad-hoc task generated for task type: {}", taskType); @@ -695,6 +708,16 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { for (TableConfig tableConfig : enabledTableConfigs) { String tableName = tableConfig.getTableName(); try { + if (!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName)) { + String message = String.format("Skipping tasks generation as resource utilization is not within limits for " + + "table: %s. Disk utilization for one or more servers hosting this table has exceeded the threshold. " + + "Tasks won't be generated until the issue is mitigated.", tableName); + LOGGER.warn(message); + response.addSchedulingError(message); + _controllerMetrics.setOrUpdateTableGauge(tableName, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L); + continue; + } + _controllerMetrics.setOrUpdateTableGauge(tableName, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 0L); String minionInstanceTag = minionInstanceTagForTask != null ? minionInstanceTagForTask : taskGenerator.getMinionInstanceTag(tableConfig); List<PinotTaskConfig> presentTaskConfig = diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java new file mode 100644 index 0000000000..8edd8b6cd1 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import com.google.common.collect.BiMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.restlet.resources.DiskUsageInfo; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.CompletionServiceHelper; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DiskUtilizationChecker { + private static final Logger LOGGER = LoggerFactory.getLogger(DiskUtilizationChecker.class); + private final int _resourceUtilizationCheckTimeoutMs; + private final long _resourceUtilizationCheckerFrequencyMs; + private final double _diskUtilizationThreshold; + private final String _diskUtilizationPath; + public static final String DISK_UTILIZATION_API_PATH = "/instance/diskUtilization"; + + private final PinotHelixResourceManager _helixResourceManager; + + public DiskUtilizationChecker(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf) { + _helixResourceManager = helixResourceManager; + _diskUtilizationPath = controllerConf.getDiskUtilizationPath(); + _diskUtilizationThreshold = controllerConf.getDiskUtilizationThreshold(); + _resourceUtilizationCheckTimeoutMs = controllerConf.getDiskUtilizationCheckTimeoutMs(); + _resourceUtilizationCheckerFrequencyMs = controllerConf.getResourceUtilizationCheckerFrequency() * 1000; + } + + /** + * Check if disk utilization for the requested table is within the configured limits. + */ + public boolean isDiskUtilizationWithinLimits(String tableNameWithType) { + if (StringUtils.isEmpty(tableNameWithType)) { + throw new IllegalArgumentException("Table name found to be null or empty while computing disk utilization."); + } + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Table config for table: {} is null", tableNameWithType); + return true; // table does not exist + } + List<String> instances; + if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + instances = _helixResourceManager.getServerInstancesForTable(tableNameWithType, TableType.OFFLINE); + } else { + instances = _helixResourceManager.getServerInstancesForTable(tableNameWithType, TableType.REALTIME); + } + return isDiskUtilizationWithinLimits(instances); + } + + private boolean isDiskUtilizationWithinLimits(List<String> instances) { + for (String instance : instances) { + DiskUsageInfo diskUsageInfo = ResourceUtilizationInfo.getDiskUsageInfo(instance); + if (diskUsageInfo == null) { + LOGGER.warn("Disk utilization info for server: {} is null", instance); + continue; + } + // Ignore if the disk utilization info is stale. The info is considered stale if it is older than the + // ResourceUtilizationChecker tasks frequency. + if (diskUsageInfo.getLastUpdatedTimeInEpochMs() + < System.currentTimeMillis() - _resourceUtilizationCheckerFrequencyMs) { + LOGGER.warn("Disk utilization info for server: {} is stale", instance); + continue; + } + if (diskUsageInfo.getUsedSpaceBytes() > diskUsageInfo.getTotalSpaceBytes() * _diskUtilizationThreshold) { + LOGGER.warn("Disk utilization for server: {} is above threshold: {}%. UsedBytes: {}, TotalBytes: {}", + instance, diskUsageInfo.getUsedSpaceBytes() * 100 / diskUsageInfo.getTotalSpaceBytes(), diskUsageInfo + .getUsedSpaceBytes(), diskUsageInfo.getTotalSpaceBytes()); + return false; + } + } + return true; + } + + /** + * Compute disk utilization for the requested instances using the <code>CompletionServiceHelper</code>. + */ + public void computeDiskUtilization(BiMap<String, String> endpointsToInstances, + CompletionServiceHelper completionServiceHelper) { + List<String> diskUtilizationUris = new ArrayList<>(endpointsToInstances.size()); + for (String endpoint : endpointsToInstances.keySet()) { + String diskUtilizationUri = endpoint + DISK_UTILIZATION_API_PATH; + diskUtilizationUris.add(diskUtilizationUri); + } + Map<String, String> reqHeaders = new HashMap<>(); + reqHeaders.put("diskUtilizationPath", _diskUtilizationPath); + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + completionServiceHelper.doMultiGetRequest(diskUtilizationUris, "no-op", false, reqHeaders, + _resourceUtilizationCheckTimeoutMs, "get disk utilization info from servers"); + LOGGER.info("Service response: {}", serviceResponse._httpResponses); + Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>(); + for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) { + try { + DiskUsageInfo diskUsageInfo = JsonUtils.stringToObject(streamResponse.getValue(), DiskUsageInfo.class); + if (diskUsageInfo != null && StringUtils.isNotEmpty(diskUsageInfo.getInstanceId())) { + LOGGER.debug("Disk utilization for instance: {} is {}", diskUsageInfo.getInstanceId(), diskUsageInfo); + diskUsageInfoMap.put(diskUsageInfo.getInstanceId(), diskUsageInfo); + } else { + LOGGER.warn("Disk utilization info for server {} is null or empty", streamResponse.getKey()); + } + } catch (Exception e) { + LOGGER.warn("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e); + } + } + if (!diskUsageInfoMap.isEmpty()) { + ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 398e06c272..f25f27bda0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ValidationMetrics; @@ -58,6 +59,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea private final ValidationMetrics _validationMetrics; private final ControllerMetrics _controllerMetrics; private final StorageQuotaChecker _storageQuotaChecker; + private final ResourceUtilizationManager _resourceUtilizationManager; private final int _segmentLevelValidationIntervalInSeconds; private long _lastSegmentLevelValidationRunTimeMs = 0L; @@ -67,7 +69,8 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, - ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics, StorageQuotaChecker quotaChecker) { + ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics, StorageQuotaChecker quotaChecker, + ResourceUtilizationManager resourceUtilizationManager) { super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(), config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics); @@ -75,6 +78,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea _validationMetrics = validationMetrics; _controllerMetrics = controllerMetrics; _storageQuotaChecker = quotaChecker; + _resourceUtilizationManager = resourceUtilizationManager; _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds(); _segmentAutoResetOnErrorAtValidation = config.isAutoResetErrorSegmentsOnValidationEnabled(); @@ -140,13 +144,39 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea * Skips updating the pause state if table is paused by admin. * Returns true if table is not paused */ - private boolean shouldEnsureConsuming(String tableNameWithType) { + @VisibleForTesting + boolean shouldEnsureConsuming(String tableNameWithType) { PauseStatusDetails pauseStatus = _llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType); boolean isTablePaused = pauseStatus.getPauseFlag(); - // if table is paused by admin then don't compute if (isTablePaused && pauseStatus.getReasonCode().equals(PauseState.ReasonCode.ADMINISTRATIVE)) { - return false; + return false; // if table is paused by admin, then skip subsequent checks + } + // Perform resource utilization checks. + boolean isResourceUtilizationWithinLimits = + _resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType); + if (!isResourceUtilizationWithinLimits) { + LOGGER.warn("Resource utilization limit exceeded for table: {}", tableNameWithType); + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, + 1L); + // update IS when table is not paused or paused with another reason code + if (!isTablePaused || !pauseStatus.getReasonCode() + .equals(PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED)) { + _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType, + PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, "Resource utilization limit exceeded."); + } + return false; // if resource utilization check failed, then skip subsequent checks + // within limits and table previously paused by resource utilization --> unpause + } else if (isTablePaused && pauseStatus.getReasonCode() + .equals(PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED)) { + // unset the pause state and allow consuming segment recreation. + _llcRealtimeSegmentManager.updatePauseStateInIdealState(tableNameWithType, false, + PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, "Resource utilization within limits"); + pauseStatus = _llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType); + isTablePaused = pauseStatus.getPauseFlag(); } + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, + 0L); + // Perform storage quota checks. TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig); if (isQuotaExceeded == isTablePaused) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java new file mode 100644 index 0000000000..60990f5151 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import com.google.common.collect.BiMap; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Executor; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.CompletionServiceHelper; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class is responsible for checking resource utilization for Pinot instances. To begin with, it checks + * disk utilization for all server instances. The computed disk utilization is stored in the class + * <code>org.apache.pinot.controller.validation.ResourceUtilizationInfo</code>. + */ +public class ResourceUtilizationChecker extends BasePeriodicTask { + private static final Logger LOGGER = LoggerFactory.getLogger(ResourceUtilizationChecker.class); + private final static String TASK_NAME = ResourceUtilizationChecker.class.getSimpleName(); + + private final PoolingHttpClientConnectionManager _connectionManager; + private final ControllerMetrics _controllerMetrics; + private final DiskUtilizationChecker _diskUtilizationChecker; + private final Executor _executor; + private final PinotHelixResourceManager _helixResourceManager; + + public ResourceUtilizationChecker(ControllerConf config, PoolingHttpClientConnectionManager connectionManager, + ControllerMetrics controllerMetrics, DiskUtilizationChecker diskUtilizationChecker, Executor executor, + PinotHelixResourceManager pinotHelixResourceManager) { + super(TASK_NAME, config.getResourceUtilizationCheckerFrequency(), + config.getResourceUtilizationCheckerInitialDelay()); + _connectionManager = connectionManager; + _controllerMetrics = controllerMetrics; + _diskUtilizationChecker = diskUtilizationChecker; + _executor = executor; + _helixResourceManager = pinotHelixResourceManager; + } + + @Override + protected final void runTask(Properties periodicTaskProperties) { + _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L); + Set<String> instances = new HashSet<>(); + try { + Set<String> serverTenantNames = _helixResourceManager.getAllServerTenantNames(); + for (String serverTenantName : serverTenantNames) { + Set<String> instancesForServerTenant = _helixResourceManager.getAllInstancesForServerTenant(serverTenantName); + if (!instancesForServerTenant.isEmpty()) { + instances.addAll(instancesForServerTenant); + } + } + BiMap<String, String> instanceAdminEndpoints = _helixResourceManager.getDataInstanceAdminEndpoints(instances); + BiMap<String, String> endpointsToInstances = instanceAdminEndpoints.inverse(); + CompletionServiceHelper completionServiceHelper = + new CompletionServiceHelper(_executor, _connectionManager, endpointsToInstances); + _diskUtilizationChecker.computeDiskUtilization(endpointsToInstances, completionServiceHelper); + } catch (Exception e) { + LOGGER.error("Caught exception while running task: {}", _taskName, e); + _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationInfo.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationInfo.java new file mode 100644 index 0000000000..d781311ce0 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationInfo.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.restlet.resources.DiskUsageInfo; + + +/** + * This class is used to capture resource utilization information for all instances. The periodic task, + * <code>ResourceUtilizationChecker</code>, will update this information. + */ +public class ResourceUtilizationInfo { + + private ResourceUtilizationInfo() { + } + + // Assumption – instanceId is unique across all tenants + private static Map<String, DiskUsageInfo> _instanceDiskUsageInfo = new HashMap<>(); + + public static DiskUsageInfo getDiskUsageInfo(String instanceId) { + return _instanceDiskUsageInfo.get(instanceId); + } + + public static void setDiskUsageInfo(Map<String, DiskUsageInfo> newDiskUsageInfo) { + _instanceDiskUsageInfo = newDiskUsageInfo; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java new file mode 100644 index 0000000000..c53507913b --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.controller.ControllerConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ResourceUtilizationManager { + private static final Logger LOGGER = LoggerFactory.getLogger(ResourceUtilizationManager.class); + + private final boolean _isResourceUtilizationCheckEnabled; + private final DiskUtilizationChecker _diskUtilizationChecker; + + public ResourceUtilizationManager(ControllerConf controllerConf, DiskUtilizationChecker diskUtilizationChecker) { + _isResourceUtilizationCheckEnabled = controllerConf.isResourceUtilizationCheckEnabled(); + LOGGER.info("Resource utilization check is: {}", _isResourceUtilizationCheckEnabled ? "enabled" : "disabled"); + _diskUtilizationChecker = diskUtilizationChecker; + } + + public boolean isResourceUtilizationWithinLimits(String tableNameWithType) { + if (!_isResourceUtilizationCheckEnabled) { + return true; + } + if (StringUtils.isEmpty(tableNameWithType)) { + throw new IllegalArgumentException("Table name found to be null or empty while checking resource utilization."); + } + LOGGER.info("Checking resource utilization for table: {}", tableNameWithType); + return _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableNameWithType); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java index 305c0a26a0..413d23c383 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java @@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest extends ControllerTest { } private class MockControllerStarter extends ControllerStarter { - private static final int NUM_PERIODIC_TASKS = 12; + private static final int NUM_PERIODIC_TASKS = 13; public MockControllerStarter() { super(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java new file mode 100644 index 0000000000..ea0b5aa2ba --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.restlet.resources.DiskUsageInfo; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.CompletionServiceHelper; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; + + +public class DiskUtilizationCheckerTest { + + private static final String DISK_UTILIZATION_PATH = "/disk/utilization/path"; + private PinotHelixResourceManager _helixResourceManager; + private ControllerConf _controllerConf; + private DiskUtilizationChecker _diskUtilizationChecker; + + @BeforeMethod + public void setUp() { + _helixResourceManager = mock(PinotHelixResourceManager.class); + _controllerConf = mock(ControllerConf.class); + + when(_controllerConf.getDiskUtilizationPath()).thenReturn(DISK_UTILIZATION_PATH); + when(_controllerConf.getDiskUtilizationThreshold()).thenReturn(0.8); + when(_controllerConf.getDiskUtilizationCheckTimeoutMs()).thenReturn(5000); + when(_controllerConf.getResourceUtilizationCheckerFrequency()).thenReturn(120L); + + _diskUtilizationChecker = new DiskUtilizationChecker(_helixResourceManager, _controllerConf); + } + + @Test + public void testIsDiskUtilizationWithinLimitsNullOrEmptyTableName() { + Assert.assertThrows(IllegalArgumentException.class, + () -> _diskUtilizationChecker.isDiskUtilizationWithinLimits(null)); + Assert.assertThrows(IllegalArgumentException.class, + () -> _diskUtilizationChecker.isDiskUtilizationWithinLimits("")); + } + + @Test + public void testIsDiskUtilizationWithinLimitsNonExistentOfflineTable() { + String tableName = "test_OFFLINE"; + when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(null); + + boolean result = _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName); + Assert.assertTrue(result); + } + + @Test + public void testIsDiskUtilizationWithinLimitsNonExistentRealtimeTable() { + String tableName = "test_REALTIME"; + when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(null); + + boolean result = _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName); + Assert.assertTrue(result); + } + + @Test + public void testIsDiskUtilizationWithinLimitsValidOfflineTable() { + String tableName = "test_OFFLINE"; + + TableConfig mockTableConfig = mock(TableConfig.class); + when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(mockTableConfig); + + List<String> mockInstances = Arrays.asList("server1", "server2"); + when(_helixResourceManager.getServerInstancesForTable(tableName, TableType.OFFLINE)).thenReturn(mockInstances); + + // Mock disk usage + Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>(); + DiskUsageInfo diskUsageInfo1 = + new DiskUsageInfo("server1", DISK_UTILIZATION_PATH, 1000L, 500L, System.currentTimeMillis()); + diskUsageInfoMap.put("server1", diskUsageInfo1); + + DiskUsageInfo diskUsageInfo2 = + new DiskUsageInfo("server2", DISK_UTILIZATION_PATH, 2000L, 1000L, System.currentTimeMillis()); + diskUsageInfoMap.put("server2", diskUsageInfo2); + ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap); + + boolean result = _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName); + Assert.assertTrue(result); + } + + @Test + public void testIsDiskUtilizationWithinLimitsAboveThreshold() { + String tableName = "test_OFFLINE"; + + TableConfig mockTableConfig = mock(TableConfig.class); + when(_helixResourceManager.getTableConfig(tableName)).thenReturn(mockTableConfig); + + List<String> mockInstances = Arrays.asList("server1", "server2"); + when(_helixResourceManager.getServerInstancesForTable(tableName, TableType.OFFLINE)).thenReturn(mockInstances); + + // Mock disk usage with high utilization + Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>(); + DiskUsageInfo diskUsageInfo1 = new DiskUsageInfo("server1", DISK_UTILIZATION_PATH, 1000L, 900L, + System.currentTimeMillis()); // Above threshold (90%) + diskUsageInfoMap.put("server1", diskUsageInfo1); + + DiskUsageInfo diskUsageInfo2 = new DiskUsageInfo("server2", DISK_UTILIZATION_PATH, 2000L, 1900L, + System.currentTimeMillis()); // Below threshold (50%) + diskUsageInfoMap.put("server2", diskUsageInfo2); + ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap); + + boolean result = _diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName); + Assert.assertFalse(result); + } + + @Test + public void testComputeDiskUtilizationValidInstances() + throws InvalidConfigException { + Set<String> instances = new HashSet<>(Arrays.asList("server1", "server2")); + + // Mock admin endpoints + BiMap<String, String> instanceAdminEndpoints = HashBiMap.create(); + instanceAdminEndpoints.put("server1", "http://server1"); + instanceAdminEndpoints.put("server2", "http://server2"); + when(_helixResourceManager.getDataInstanceAdminEndpoints(instances)).thenReturn(instanceAdminEndpoints); + + // Mock responses + Map<String, String> responseMap = new HashMap<>(); + responseMap.put("http://server1" + DiskUtilizationChecker.DISK_UTILIZATION_API_PATH, + "{ \"instanceId\": \"server1\", \"totalSpaceBytes\": 1000, \"usedSpaceBytes\": 500 }"); + responseMap.put("http://server2" + DiskUtilizationChecker.DISK_UTILIZATION_API_PATH, + "{ \"instanceId\": \"server2\", \"totalSpaceBytes\": 2000, \"usedSpaceBytes\": 1500 }"); + + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + new CompletionServiceHelper.CompletionServiceResponse(); + serviceResponse._httpResponses = responseMap; + + CompletionServiceHelper completionServiceHelper = mock(CompletionServiceHelper.class); + when(completionServiceHelper.doMultiGetRequest(anyList(), anyString(), anyBoolean(), anyMap(), anyInt(), + anyString())).thenReturn(serviceResponse); + + _diskUtilizationChecker.computeDiskUtilization(instanceAdminEndpoints.inverse(), completionServiceHelper); + + DiskUsageInfo diskUsageInfo1 = ResourceUtilizationInfo.getDiskUsageInfo("server1"); + DiskUsageInfo diskUsageInfo2 = ResourceUtilizationInfo.getDiskUsageInfo("server2"); + + Assert.assertNotNull(diskUsageInfo1); + Assert.assertEquals(diskUsageInfo1.getTotalSpaceBytes(), 1000L); + Assert.assertEquals(diskUsageInfo1.getUsedSpaceBytes(), 500L); + + Assert.assertNotNull(diskUsageInfo2); + Assert.assertEquals(diskUsageInfo2.getTotalSpaceBytes(), 2000L); + Assert.assertEquals(diskUsageInfo2.getUsedSpaceBytes(), 1500L); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java new file mode 100644 index 0000000000..7ca553f2b3 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.resources.PauseStatusDetails; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; +import org.apache.pinot.spi.config.table.PauseState; +import org.apache.pinot.spi.config.table.TableConfig; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; + + +public class RealtimeSegmentValidationManagerTest { + @Mock + private PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager; + + @Mock + private ResourceUtilizationManager _resourceUtilizationManager; + + @Mock + private PinotHelixResourceManager _pinotHelixResourceManager; + + @Mock + private StorageQuotaChecker _storageQuotaChecker; + + @Mock + private ControllerMetrics _controllerMetrics; + + private AutoCloseable _mocks; + private RealtimeSegmentValidationManager _realtimeSegmentValidationManager; + + @BeforeMethod + public void setup() { + ControllerConf controllerConf = new ControllerConf(); + _mocks = MockitoAnnotations.openMocks(this); + _realtimeSegmentValidationManager = + new RealtimeSegmentValidationManager(controllerConf, _pinotHelixResourceManager, null, + _llcRealtimeSegmentManager, null, _controllerMetrics, _storageQuotaChecker, _resourceUtilizationManager); + } + + @AfterMethod + public void tearDown() + throws Exception { + _mocks.close(); + } + + @DataProvider(name = "testCases") + public Object[][] testCases() { + return new Object[][]{ + // Table is paused due to admin intervention, should return false + {true, PauseState.ReasonCode.ADMINISTRATIVE, false, false, false}, + + // Resource utilization exceeded and pause state is updated, should return false + {false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, true, false, false}, + + // Resource utilization is within limits but was previously paused due to resource utilization, + // should return true + {true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, false, false, true}, + + // Resource utilization is within limits but was previously paused due to storage quota exceeded, + // should return false + {true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, false, true, false}, + + // Storage quota exceeded, should return false + {false, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, false, true, false}, + + // Storage quota within limits but was previously paused due to storage quota exceeded, should return true + {true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, false, false, true}}; + } + + @Test(dataProvider = "testCases") + public void testShouldEnsureConsuming(boolean isTablePaused, PauseState.ReasonCode reasonCode, + boolean isResourceUtilizationExceeded, boolean isQuotaExceeded, boolean expectedResult) { + String tableName = "testTable_REALTIME"; + PauseStatusDetails pauseStatus = mock(PauseStatusDetails.class); + TableConfig tableConfig = mock(TableConfig.class); + + when(pauseStatus.getPauseFlag()).thenReturn(isTablePaused); + when(pauseStatus.getReasonCode()).thenReturn(reasonCode); + when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName)).thenReturn(pauseStatus); + when(_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName)).thenReturn( + !isResourceUtilizationExceeded); + when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); + when(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig)).thenReturn(isQuotaExceeded); + + boolean result = _realtimeSegmentValidationManager.shouldEnsureConsuming(tableName); + + Assert.assertEquals(result, expectedResult); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationInfoTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationInfoTest.java new file mode 100644 index 0000000000..11f3d77b1f --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationInfoTest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.restlet.resources.DiskUsageInfo; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class ResourceUtilizationInfoTest { + + private static final String INSTANCE_ID_1 = "server-1"; + private static final String INSTANCE_ID_2 = "server-2"; + + private DiskUsageInfo _diskUsageInfo1; + private DiskUsageInfo _diskUsageInfo2; + + @BeforeMethod + public void setUp() { + // Create sample DiskUsageInfo objects + _diskUsageInfo1 = new DiskUsageInfo(INSTANCE_ID_1, "/path/to/disk1", 1000L, 500L, System.currentTimeMillis()); + _diskUsageInfo2 = new DiskUsageInfo(INSTANCE_ID_2, "/path/to/disk2", 2000L, 1500L, System.currentTimeMillis()); + } + + @Test + public void testDiskUsageInfo() { + // Set disk usage info for multiple instances + Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>(); + diskUsageInfoMap.put(INSTANCE_ID_1, _diskUsageInfo1); + diskUsageInfoMap.put(INSTANCE_ID_2, _diskUsageInfo2); + ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap); + + // Validate instance 1 + DiskUsageInfo diskUsageInfoInstance1 = ResourceUtilizationInfo.getDiskUsageInfo(INSTANCE_ID_1); + Assert.assertNotNull(diskUsageInfoInstance1); + Assert.assertEquals(diskUsageInfoInstance1.getTotalSpaceBytes(), 1000L); + Assert.assertEquals(diskUsageInfoInstance1.getUsedSpaceBytes(), 500L); + + // Validate instance 2 + DiskUsageInfo diskUsageInfoInstance2 = ResourceUtilizationInfo.getDiskUsageInfo(INSTANCE_ID_2); + Assert.assertNotNull(diskUsageInfoInstance2); + Assert.assertEquals(diskUsageInfoInstance2.getTotalSpaceBytes(), 2000L); + Assert.assertEquals(diskUsageInfoInstance2.getUsedSpaceBytes(), 1500L); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java new file mode 100644 index 0000000000..bb61d6e083 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import org.apache.pinot.controller.ControllerConf; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class ResourceUtilizationManagerTest { + + private DiskUtilizationChecker _diskUtilizationChecker; + private ControllerConf _controllerConf; + private ResourceUtilizationManager _resourceUtilizationManager; + private final String _testTable = "myTable_OFFLINE"; + + @BeforeMethod + public void setUp() { + _diskUtilizationChecker = Mockito.mock(DiskUtilizationChecker.class); + _controllerConf = Mockito.mock(ControllerConf.class); + } + + @Test + public void testIsResourceUtilizationWithinLimitsWhenCheckIsDisabled() { + Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(false); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + + boolean result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable); + Assert.assertTrue(result, "Resource utilization should be within limits when the check is disabled"); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testIsResourceUtilizationWithinLimitsWithNullTableName() { + Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + + _resourceUtilizationManager.isResourceUtilizationWithinLimits(null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testIsResourceUtilizationWithinLimitsWithEmptyTableName() { + Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + + _resourceUtilizationManager.isResourceUtilizationWithinLimits(""); + } + + @Test + public void testIsResourceUtilizationWithinLimitsWhenCheckIsEnabled() { + Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); + Mockito.when(_diskUtilizationChecker.isDiskUtilizationWithinLimits(_testTable)).thenReturn(true); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + + boolean result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable); + Assert.assertTrue(result, "Resource utilization should be within limits when disk check returns true"); + } + + @Test + public void testIsResourceUtilizationWithinLimitsWhenCheckFails() { + Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true); + Mockito.when(_diskUtilizationChecker.isDiskUtilizationWithinLimits(_testTable)).thenReturn(false); + _resourceUtilizationManager = new ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker); + + boolean result = _resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable); + Assert.assertFalse(result, "Resource utilization should not be within limits when disk check returns false"); + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DiskUtilization.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DiskUtilization.java new file mode 100644 index 0000000000..b2822e0e20 --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DiskUtilization.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.restlet.resources.DiskUsageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DiskUtilization { + private static final Logger LOGGER = LoggerFactory.getLogger(DiskUtilization.class); + + private DiskUtilization() { + } + /** + * Computes the disk usage for the file store containing the given path using NIO. + */ + public static DiskUsageInfo computeDiskUsage(String instanceId, String pathStr) throws IOException { + if (StringUtils.isEmpty(instanceId)) { + throw new IllegalArgumentException("InstanceID cannot be null or empty while computing disk utilization."); + } + + if (StringUtils.isEmpty(pathStr)) { + throw new IllegalArgumentException("Path cannot be null or empty while computing disk utilization."); + } + + Path path = Paths.get(pathStr); + if (!Files.exists(path)) { + throw new IllegalArgumentException("The path " + pathStr + " does not exist."); + } + + // Obtain the file store corresponding to the path. + FileStore store = Files.getFileStore(path); + + long totalSpace = store.getTotalSpace(); + // getUnallocatedSpace() returns the number of bytes that are unallocated in the file store, + // which corresponds to the "free space" on disk. + long freeSpace = store.getUnallocatedSpace(); + long usedSpace = totalSpace - freeSpace; + LOGGER.info("Disk usage for instance: {} at path: {} totalSpace: {}, usedSpace: {}", instanceId, pathStr, + totalSpace, usedSpace); + return new DiskUsageInfo(instanceId, path.toString(), totalSpace, usedSpace, System.currentTimeMillis()); + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java index 8c0618f64d..d2a7a0c4f3 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java @@ -27,6 +27,7 @@ import io.swagger.annotations.ApiResponses; import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,10 +36,15 @@ import javax.inject.Named; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.StringUtils; import org.apache.helix.HelixManager; import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.restlet.resources.DiskUsageInfo; +import org.apache.pinot.common.restlet.resources.ResourceUtils; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.server.api.AdminApiApplication; @@ -97,4 +103,22 @@ public class InstanceResource { Map<String, String> pools = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY); return pools == null ? Collections.emptyMap() : pools; } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/diskUtilization") + @ApiOperation(value = "Show disk utilization", notes = "Disk capacity and usage shown in bytes") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 400, message = "Bad Request – Invalid disk utilization path in header") + }) + public String getDiskUsageInfo(@Context HttpHeaders headers) + throws WebApplicationException, IOException { + String pathStr = headers.getHeaderString("diskUtilizationPath"); + if (StringUtils.isEmpty(pathStr)) { + throw new WebApplicationException("Invalid disk utilization path in header", 400); + } + DiskUsageInfo diskUsageInfo = DiskUtilization.computeDiskUsage(_instanceId, pathStr); + return ResourceUtils.convertToJsonString(diskUsageInfo); + } } diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/DiskUtilizationTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/DiskUtilizationTest.java new file mode 100644 index 0000000000..f3648b48dc --- /dev/null +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/DiskUtilizationTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api; + +import java.io.IOException; +import org.apache.pinot.common.restlet.resources.DiskUsageInfo; +import org.apache.pinot.server.api.resources.DiskUtilization; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class DiskUtilizationTest { + + private static final String TEST_PATH = "/tmp"; + + @Test + public void testComputeDiskUsageForExistingPath() + throws IOException { + DiskUsageInfo usage = DiskUtilization.computeDiskUsage("myInstanceId", TEST_PATH); + + // Assert that total space is positive. + Assert.assertTrue(usage.getTotalSpaceBytes() > 0, "Total space should be positive."); + + // Assert that used space is non-negative and not more than the total space. + Assert.assertTrue(usage.getUsedSpaceBytes() >= 0, "Used space should be non-negative."); + Assert.assertTrue(usage.getUsedSpaceBytes() <= usage.getTotalSpaceBytes(), + "Used space should be less than or equal to total space."); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testComputeDiskUsageForNonExistingPath() + throws IOException { + String nonExistentPath = "non/existent/path/for/testing"; + DiskUtilization.computeDiskUsage("myInstanceId", nonExistentPath); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java index 6a0f8c63e9..4781bfbdac 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java @@ -70,6 +70,6 @@ public class PauseState extends BaseJsonConfig { } public enum ReasonCode { - ADMINISTRATIVE, STORAGE_QUOTA_EXCEEDED + ADMINISTRATIVE, STORAGE_QUOTA_EXCEEDED, RESOURCE_UTILIZATION_LIMIT_EXCEEDED } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org