This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 98c39a5f71 Repurpose MinionInstancesCleanupTask to StaleInstancesCleanupTask to remove stale broker and server instances as well. (#10027) 98c39a5f71 is described below commit 98c39a5f718b7d1f623cd4f856a720206773544d Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Wed Jan 4 21:47:42 2023 -0800 Repurpose MinionInstancesCleanupTask to StaleInstancesCleanupTask to remove stale broker and server instances as well. (#10027) --- .../pinot/common/metrics/ControllerGauge.java | 8 +- .../pinot/controller/BaseControllerStarter.java | 14 +- .../apache/pinot/controller/ControllerConf.java | 52 ++++++++ .../core/cleanup/StaleInstancesCleanupTask.java | 146 +++++++++++++++++++++ .../core/minion/MinionInstancesCleanupTask.java | 88 ------------- .../StaleInstancesCleanupTaskStatelessTest.java | 138 +++++++++++++++++++ .../MinionInstancesCleanupTaskStatelessTest.java | 82 ------------ 7 files changed, 350 insertions(+), 178 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index b44bd8e999..64ced0203c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -101,9 +101,15 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { // Number of Tasks Status TASK_STATUS("taskStatus", false), - // Number of dropped minion instances + // Number of dropped stale minion instances DROPPED_MINION_INSTANCES("droppedMinionInstances", true), + // Number of dropped stale broker instances + DROPPED_BROKER_INSTANCES("droppedBrokerInstances", true), + + // Number of dropped stale server instances + DROPPED_SERVER_INSTANCES("droppedServerInstances", true), + // Number of online minion instances ONLINE_MINION_INSTANCES("onlineMinionInstances", true), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index f6ebb5006d..83db19eccf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -76,7 +76,7 @@ import org.apache.pinot.controller.api.resources.InvalidControllerConfigExceptio import org.apache.pinot.controller.helix.RealtimeConsumerMonitor; import org.apache.pinot.controller.helix.SegmentStatusChecker; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.minion.MinionInstancesCleanupTask; +import org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter; @@ -164,7 +164,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected SegmentCompletionManager _segmentCompletionManager; protected LeadControllerManager _leadControllerManager; protected List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList; - protected MinionInstancesCleanupTask _minionInstancesCleanupTask; + protected StaleInstancesCleanupTask _staleInstancesCleanupTask; protected TaskMetricsEmitter _taskMetricsEmitter; protected MultiThreadedHttpConnectionManager _connectionManager; @@ -295,8 +295,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { return _taskManager; } - public MinionInstancesCleanupTask getMinionInstancesCleanupTask() { - return _minionInstancesCleanupTask; + public StaleInstancesCleanupTask getStaleInstancesCleanupTask() { + return _staleInstancesCleanupTask; } @Override @@ -687,9 +687,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService, _connectionManager); periodicTasks.add(_segmentRelocator); - _minionInstancesCleanupTask = - new MinionInstancesCleanupTask(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); - periodicTasks.add(_minionInstancesCleanupTask); + _staleInstancesCleanupTask = + new StaleInstancesCleanupTask(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); + periodicTasks.add(_staleInstancesCleanupTask); _taskMetricsEmitter = new TaskMetricsEmitter(_helixResourceManager, _helixTaskResourceManager, _leadControllerManager, _config, _controllerMetrics); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index e24530ab4c..015b1e90de 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -129,16 +129,27 @@ public class ControllerConf extends PinotConfiguration { @Deprecated public static final String DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = "controller.minion.instances.cleanup.task.frequencyInSeconds"; + @Deprecated public static final String MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD = "controller.minion.instances.cleanup.task.frequencyPeriod"; + @Deprecated public static final String MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS = "controller.minion.instances.cleanup.task.initialDelaySeconds"; // Deprecated as of 0.8.0 @Deprecated public static final String DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_SECONDS = "controller.minion.instances.cleanup.task.minOfflineTimeBeforeDeletionSeconds"; + @Deprecated public static final String MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD = "controller.minion.instances.cleanup.task.minOfflineTimeBeforeDeletionPeriod"; + + public static final String STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD = + "controller.stale.instances.cleanup.task.frequencyPeriod"; + public static final String STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS = + "controller.stale.instances.cleanup.task.initialDelaySeconds"; + public static final String STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD = + "controller.stale.instances.cleanup.task.minOfflineTimeBeforeDeletionPeriod"; + // Deprecated as of 0.8.0 @Deprecated public static final String DEPRECATED_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS = @@ -209,7 +220,9 @@ public class ControllerConf extends PinotConfiguration { private static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled + @Deprecated private static final int DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. + @Deprecated private static final int DEFAULT_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_IN_SECONDS = 60 * 60; // 1 Hour. @@ -699,6 +712,7 @@ public class ControllerConf extends PinotConfiguration { Integer.toString(frequencyInSeconds)); } + @Deprecated public int getMinionInstancesCleanupTaskFrequencyInSeconds() { return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD)) .map(period -> (int) convertPeriodToSeconds(period)).orElseGet( @@ -706,21 +720,25 @@ public class ControllerConf extends PinotConfiguration { ControllerPeriodicTasksConf.DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS)); } + @Deprecated public void setMinionInstancesCleanupTaskFrequencyInSeconds(int frequencyInSeconds) { setProperty(ControllerPeriodicTasksConf.DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds)); } + @Deprecated public long getMinionInstancesCleanupTaskInitialDelaySeconds() { return getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS, ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds()); } + @Deprecated public void setMinionInstancesCleanupTaskInitialDelaySeconds(int initialDelaySeconds) { setProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS, Integer.toString(initialDelaySeconds)); } + @Deprecated public int getMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds() { return Optional.ofNullable( getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD)) @@ -731,12 +749,46 @@ public class ControllerConf extends PinotConfiguration { DEFAULT_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_IN_SECONDS)); } + @Deprecated public void setMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds(int maxOfflineTimeRangeInSeconds) { setProperty( ControllerPeriodicTasksConf.DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_SECONDS, Integer.toString(maxOfflineTimeRangeInSeconds)); } + public int getStaleInstancesCleanupTaskFrequencyInSeconds() { + return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD)) + .map(period -> (int) convertPeriodToSeconds(period)) + // Backward compatible for existing users who configured MinionInstancesCleanupTask + .orElse(getMinionInstancesCleanupTaskFrequencyInSeconds()); + } + + public void setStaleInstanceCleanupTaskFrequencyInSeconds(String frequencyPeriod) { + setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD, frequencyPeriod); + } + + public long getStaleInstanceCleanupTaskInitialDelaySeconds() { + return getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS, + // Backward compatible for existing users who configured MinionInstancesCleanupTask + getMinionInstancesCleanupTaskInitialDelaySeconds()); + } + + public void setStaleInstanceCleanupTaskInitialDelaySeconds(long initialDelaySeconds) { + setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS, initialDelaySeconds); + } + + public int getStaleInstancesCleanupTaskInstancesRetentionInSeconds() { + return Optional.ofNullable( + getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD)) + .map(period -> (int) convertPeriodToSeconds(period)) + // Backward compatible for existing users who configured MinionInstancesCleanupTask + .orElse(getMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds()); + } + + public void setStaleInstancesCleanupTaskInstancesRetentionPeriod(String retentionPeriod) { + setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD, retentionPeriod); + } + public int getDefaultTableMinReplicas() { return getProperty(TABLE_MIN_REPLICAS, DEFAULT_TABLE_MIN_REPLICAS); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java new file mode 100644 index 0000000000..b257462985 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.cleanup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Automatically removes stale instances from the cluster to not spam Helix. + * Stale instance is the instance not in use (not hosting any data or query) and has been in the offline status for more + * than the stale instance retention time. + */ +public class StaleInstancesCleanupTask extends BasePeriodicTask { + private static final Logger LOGGER = LoggerFactory.getLogger(StaleInstancesCleanupTask.class); + private final static String TASK_NAME = "StaleInstancesCleanupTask"; + + protected final PinotHelixResourceManager _pinotHelixResourceManager; + protected final LeadControllerManager _leadControllerManager; + protected final ControllerMetrics _controllerMetrics; + // This applies to both broker and server instances. + private final long _staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds; + + public StaleInstancesCleanupTask(PinotHelixResourceManager pinotHelixResourceManager, + LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) { + super(TASK_NAME, controllerConf.getStaleInstancesCleanupTaskFrequencyInSeconds(), + controllerConf.getStaleInstanceCleanupTaskInitialDelaySeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _leadControllerManager = leadControllerManager; + _controllerMetrics = controllerMetrics; + _staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds = + controllerConf.getStaleInstancesCleanupTaskInstancesRetentionInSeconds() * 1000L; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + // Make it so that only one controller is responsible for cleaning up minion instances. + if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) { + return; + } + + List<String> offlineInstances = new ArrayList<>(_pinotHelixResourceManager.getAllInstances()); + offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList()); + + Set<String> serverInstancesInUse = getServerInstancesInUse(); + Set<String> brokerInstancesInUse = getBrokerInstancesInUse(); + + for (String offlineInstance : offlineInstances) { + // Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK session expire (e.g. due to network issue), + // the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race condition can happen when this task is + // running. + // In order to double confirm the live status of an instance, the field "LAST_OFFLINE_TIME" in ZNode under + // "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value is "-1", that means the instance is + // ONLINE; + // if the value is a timestamp, that means the instance starts to be OFFLINE since that time. + if (InstanceTypeUtils.isMinion(offlineInstance)) { + // Drop the minion instance if it has been offline for more than a period of this task. + if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance, + _staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) { + LOGGER.info("Dropping minion instance: {}", offlineInstance); + PinotResourceManagerResponse response = _pinotHelixResourceManager.dropInstance(offlineInstance); + if (response.isSuccessful()) { + _controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES, 1); + } + } + continue; + } + + // Drop the broker instance if it has been offline for more than a period of this task. + if (InstanceTypeUtils.isBroker(offlineInstance) && !brokerInstancesInUse.contains(offlineInstance)) { + if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance, + _staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) { + LOGGER.info("Dropping broker instance: {}", offlineInstance); + PinotResourceManagerResponse response = _pinotHelixResourceManager.dropInstance(offlineInstance); + if (response.isSuccessful()) { + _controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES, 1); + } + } + continue; + } + + // Drop the server instance if it has been offline for more than a period of this task. + if (InstanceTypeUtils.isServer(offlineInstance) && !serverInstancesInUse.contains(offlineInstance)) { + if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance, + _staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) { + LOGGER.info("Dropping server instance: {}", offlineInstance); + PinotResourceManagerResponse response = _pinotHelixResourceManager.dropInstance(offlineInstance); + if (response.isSuccessful()) { + _controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES, 1); + } + } + } + } + } + + private Set<String> getBrokerInstancesInUse() { + Set<String> brokerInstancesInUse = new HashSet<>(); + final IdealState brokerResource = _pinotHelixResourceManager.getHelixAdmin() + .getResourceIdealState(_pinotHelixResourceManager.getHelixClusterName(), + CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + brokerResource.getPartitionSet() + .forEach(table -> brokerInstancesInUse.addAll(brokerResource.getInstanceSet(table))); + return brokerInstancesInUse; + } + + private Set<String> getServerInstancesInUse() { + Set<String> serverInstancesInUse = new HashSet<>(); + _pinotHelixResourceManager.getAllTables().forEach(tableName -> serverInstancesInUse.addAll( + Optional.ofNullable(_pinotHelixResourceManager.getTableIdealState(tableName)) + .map(is -> is.getInstanceSet(tableName)).orElse(Collections.emptySet()))); + return serverInstancesInUse; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java deleted file mode 100644 index 09392c6b00..0000000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.minion; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import org.apache.pinot.common.metrics.ControllerGauge; -import org.apache.pinot.common.metrics.ControllerMetrics; -import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.LeadControllerManager; -import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; -import org.apache.pinot.core.periodictask.BasePeriodicTask; -import org.apache.pinot.spi.utils.InstanceTypeUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * A periodic task to clean up offline Minion instances to not spam Helix. - */ -public class MinionInstancesCleanupTask extends BasePeriodicTask { - private static final Logger LOGGER = LoggerFactory.getLogger(MinionInstancesCleanupTask.class); - private final static String TASK_NAME = "MinionInstancesCleanupTask"; - protected final PinotHelixResourceManager _pinotHelixResourceManager; - protected final LeadControllerManager _leadControllerManager; - protected final ControllerMetrics _controllerMetrics; - private final long _minionInstanceCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds; - - public MinionInstancesCleanupTask(PinotHelixResourceManager pinotHelixResourceManager, - LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) { - super(TASK_NAME, controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(), - controllerConf.getMinionInstancesCleanupTaskInitialDelaySeconds()); - _pinotHelixResourceManager = pinotHelixResourceManager; - _leadControllerManager = leadControllerManager; - _controllerMetrics = controllerMetrics; - _minionInstanceCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds = - controllerConf.getMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds() * 1000L; - } - - @Override - protected void runTask(Properties periodicTaskProperties) { - // Make it so that only one controller is responsible for cleaning up minion instances. - if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) { - return; - } - - List<String> offlineInstances = new ArrayList<>(_pinotHelixResourceManager.getAllInstances()); - offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList()); - for (String offlineInstance : offlineInstances) { - // Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK session expire (e.g. due to network issue), - // the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race condition can happen when this task is - // running. - // In order to double confirm the live status of an instance, the field "LAST_OFFLINE_TIME" in ZNode under - // "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value is "-1", that means the instance is - // ONLINE; - // if the value is a timestamp, that means the instance starts to be OFFLINE since that time. - if (InstanceTypeUtils.isMinion(offlineInstance)) { - // Drop the minion instance if it has been offline for more than a period of this task. - if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance, - _minionInstanceCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) { - LOGGER.info("Dropping minion instance: {}", offlineInstance); - PinotResourceManagerResponse response = _pinotHelixResourceManager.dropInstance(offlineInstance); - if (response.isSuccessful()) { - _controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES, 1); - } - } - } - } - } -} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java new file mode 100644 index 0000000000..4bc0b4cd10 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.cleanup; + +import java.util.Map; +import java.util.Properties; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.ControllerTest; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +@Test(groups = "stateless") +public class StaleInstancesCleanupTaskStatelessTest extends ControllerTest { + @BeforeClass + public void setup() + throws Exception { + startZk(); + startController(); + } + + @Test + public void testStaleInstancesCleanupTaskForBrokers() + throws Exception { + StaleInstancesCleanupTask staleInstancesCleanupTask = _controllerStarter.getStaleInstancesCleanupTask(); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 0); + addFakeBrokerInstancesToAutoJoinHelixCluster(3, true); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 0); + stopFakeInstance("Broker_localhost_0"); + Thread.sleep(1000); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 1); + stopFakeInstance("Broker_localhost_1"); + Thread.sleep(1000); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 2); + stopFakeInstance("Broker_localhost_2"); + Thread.sleep(1000); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 3); + } + + @Test + public void testStaleInstancesCleanupTaskForServers() + throws Exception { + StaleInstancesCleanupTask staleInstancesCleanupTask = _controllerStarter.getStaleInstancesCleanupTask(); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 0); + addFakeServerInstancesToAutoJoinHelixCluster(3, true); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 0); + stopFakeInstance("Server_localhost_0"); + Thread.sleep(1000); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 1); + stopFakeInstance("Server_localhost_1"); + Thread.sleep(1000); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 2); + stopFakeInstance("Server_localhost_2"); + Thread.sleep(1000); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 3); + } + + @Test + public void testStaleInstancesCleanupTaskForMinions() + throws Exception { + StaleInstancesCleanupTask staleInstancesCleanupTask = _controllerStarter.getStaleInstancesCleanupTask(); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); + addFakeMinionInstancesToAutoJoinHelixCluster(3); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); + stopFakeInstance("Minion_localhost_0"); + Thread.sleep(1000); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 1); + stopFakeInstance("Minion_localhost_1"); + Thread.sleep(1000); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 2); + stopFakeInstance("Minion_localhost_2"); + Thread.sleep(1000); + staleInstancesCleanupTask.runTask(new Properties()); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 3); + } + + @Override + public Map<String, Object> getDefaultControllerConfiguration() { + Map<String, Object> properties = super.getDefaultControllerConfiguration(); + // Override the cleanup before deletion period so that test can avoid stuck failure + properties.put(ControllerConf.ControllerPeriodicTasksConf. + MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD, "1s"); + properties.put(ControllerConf.ControllerPeriodicTasksConf. + STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD, "1s"); + return properties; + } + + @AfterClass + public void teardown() { + stopController(); + stopZk(); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskStatelessTest.java deleted file mode 100644 index 950f7c5492..0000000000 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskStatelessTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.minion; - -import java.util.Map; -import java.util.Properties; -import org.apache.pinot.common.metrics.ControllerGauge; -import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.helix.ControllerTest; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - - -@Test(groups = "stateless") -public class MinionInstancesCleanupTaskStatelessTest extends ControllerTest { - @BeforeClass - public void setup() - throws Exception { - startZk(); - startController(); - } - - @Test - public void testMinionInstancesCleanupTask() - throws Exception { - MinionInstancesCleanupTask minionInstancesCleanupTask = _controllerStarter.getMinionInstancesCleanupTask(); - minionInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); - addFakeMinionInstancesToAutoJoinHelixCluster(3); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); - stopFakeInstance("Minion_localhost_0"); - Thread.sleep(1000); - minionInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 1); - stopFakeInstance("Minion_localhost_1"); - Thread.sleep(1000); - minionInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 2); - stopFakeInstance("Minion_localhost_2"); - Thread.sleep(1000); - minionInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 3); - } - - @Override - public Map<String, Object> getDefaultControllerConfiguration() { - Map<String, Object> properties = super.getDefaultControllerConfiguration(); - // Override the cleanup before deletion period so that test can avoid stuck failure - properties.put(ControllerConf.ControllerPeriodicTasksConf. - MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD, "1s"); - return properties; - } - - @AfterClass - public void teardown() { - stopController(); - stopZk(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org