This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch cleanup_dead_minion_instances in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4c8cee3bf45a7de61d5cf8e13f78402119275db6 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Thu Feb 4 02:32:04 2021 -0800 Adding a controller periodic task to clean up dead minion instances --- .../pinot/common/metrics/ControllerGauge.java | 5 +- .../apache/pinot/controller/ControllerConf.java | 24 ++++++++ .../apache/pinot/controller/ControllerStarter.java | 8 +++ .../core/minion/MinionInstancesCleanupTask.java | 60 ++++++++++++++++++ .../pinot/controller/helix/ControllerTest.java | 72 ++++++++++++++++++++++ .../minion/MinionInstancesCleanupTaskTest.java | 66 ++++++++++++++++++++ .../apache/pinot/spi/config/tenant/TenantRole.java | 2 +- 7 files changed, 235 insertions(+), 2 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 3f69f45..869db11 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -67,7 +67,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false), // Number of scheduled Cron jobs - CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false); + CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false), + + // Number of dropped minion instances + DROPPED_MINION_INSTANCES("droppedMinionInstances", true); private final String gaugeName; private final String unit; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 76ab5ec..61d34cf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -88,6 +88,10 @@ public class ControllerConf extends PinotConfiguration { public static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS = "controller.statuschecker.waitForPushTimeInSeconds"; public static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds"; + public static final String MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = "controller.minion.instances.cleanup.task.frequencyInSeconds"; + public static final String MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS = "controller.minion.instances.cleanup.task.initialDelaySeconds"; + + public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = "controller.task.scheduler.enabled"; @Deprecated // RealtimeSegmentRelocator has been rebranded as SegmentRelocator @@ -131,6 +135,8 @@ public class ControllerConf extends PinotConfiguration { private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled + private static final int DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. + private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60; private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 60 * 60; } @@ -539,6 +545,24 @@ public class ControllerConf extends PinotConfiguration { setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds)); } + public long getMinionInstancesCleanupTaskFrequencyInSeconds() { + return getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS, + ControllerPeriodicTasksConf.DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS); + } + + public void setMinionInstancesCleanupTaskFrequencyInSeconds(int frequencyInSeconds) { + setProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds)); + } + + public long getMinionInstancesCleanupTaskInitialDelaySeconds() { + return getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS, + ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds()); + } + + public void setMinionInstancesCleanupTaskInitialDelaySeconds(int initialDelaySeconds) { + setProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS, Integer.toString(initialDelaySeconds)); + } + public int getDefaultTableMinReplicas() { return getProperty(TABLE_MIN_REPLICAS, DEFAULT_TABLE_MIN_REPLICAS); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java index 233f85e..0845283 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java @@ -63,6 +63,7 @@ import org.apache.pinot.common.utils.helix.LeadControllerUtils; import org.apache.pinot.controller.api.ControllerAdminApiApplication; import org.apache.pinot.controller.api.access.AccessControlFactory; import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory; +import org.apache.pinot.controller.helix.core.minion.MinionInstancesCleanupTask; import org.apache.pinot.core.transport.ListenerConfig; import org.apache.pinot.controller.api.resources.ControllerFilePathProvider; import org.apache.pinot.controller.api.resources.InvalidControllerConfigException; @@ -140,6 +141,7 @@ public class ControllerStarter implements ServiceStartable { private SegmentCompletionManager _segmentCompletionManager; private LeadControllerManager _leadControllerManager; private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList; + private MinionInstancesCleanupTask _minionInstancesCleanupTask; public ControllerStarter(ControllerConf conf) { _config = conf; @@ -252,6 +254,10 @@ public class ControllerStarter implements ServiceStartable { return _taskManager; } + public MinionInstancesCleanupTask getMinionInstancesCleanupTask() { + return _minionInstancesCleanupTask; + } + @Override public ServiceRole getServiceRole() { return ServiceRole.CONTROLLER; @@ -580,6 +586,8 @@ public class ControllerStarter implements ServiceStartable { _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); periodicTasks.add(_segmentRelocator); + _minionInstancesCleanupTask = new MinionInstancesCleanupTask(_helixResourceManager, _config, _controllerMetrics); + periodicTasks.add(_minionInstancesCleanupTask); return periodicTasks; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java new file mode 100644 index 0000000..0d0feed --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion; + +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; +import org.apache.pinot.core.periodictask.BasePeriodicTask; + + +/** + * A periodic task to clean up offline Minion instances to not spam Helix. + */ +public class MinionInstancesCleanupTask extends BasePeriodicTask { + protected final PinotHelixResourceManager _pinotHelixResourceManager; + protected final ControllerMetrics _controllerMetrics; + + public MinionInstancesCleanupTask(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, + ControllerMetrics controllerMetrics) { + super("MinionInstancesCleanupTask", controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(), + controllerConf.getMinionInstancesCleanupTaskInitialDelaySeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _controllerMetrics = controllerMetrics; + } + + @Override + protected void runTask() { + List<String> offlineInstances = new ArrayList<>(_pinotHelixResourceManager.getAllInstances()); + offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList()); + for (String offlineInstance : offlineInstances) { + if (offlineInstance.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) { + PinotResourceManagerResponse response = _pinotHelixResourceManager.dropInstance(offlineInstance); + if (response.isSuccessful()) { + _controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES, 1); + } + } + } + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index a88799d..34dece7 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -93,6 +93,7 @@ public abstract class ControllerTest { new File(FileUtils.getTempDirectoryPath(), "test-controller-" + System.currentTimeMillis()).getAbsolutePath(); protected static final String BROKER_INSTANCE_ID_PREFIX = "Broker_localhost_"; protected static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_"; + protected static final String MINION_INSTANCE_ID_PREFIX = "Minion_localhost_"; protected final List<HelixManager> _fakeInstanceHelixManagers = new ArrayList<>(); @@ -381,6 +382,77 @@ public abstract class ControllerTest { } } + protected void addFakeMinionInstancesToAutoJoinHelixCluster(int numInstances) + throws Exception { + for (int i = 0; i < numInstances; i++) { + addFakeMinionInstanceToAutoJoinHelixCluster(MINION_INSTANCE_ID_PREFIX + i); + } + } + + protected void addFakeMinionInstanceToAutoJoinHelixCluster(String instanceId) + throws Exception { + HelixManager helixManager = + HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT, + ZkStarter.DEFAULT_ZK_STR); + helixManager.getStateMachineEngine() + .registerStateModelFactory(FakeMinionResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF, + FakeMinionResourceOnlineOfflineStateModelFactory.FACTORY_INSTANCE); + helixManager.connect(); + HelixAdmin helixAdmin = helixManager.getClusterManagmentTool(); + helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, UNTAGGED_MINION_INSTANCE); + _fakeInstanceHelixManagers.add(helixManager); + } + + public static class FakeMinionResourceOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { + private static final String STATE_MODEL_DEF = "MinionResourceOnlineOfflineStateModel"; + private static final FakeMinionResourceOnlineOfflineStateModelFactory FACTORY_INSTANCE = + new FakeMinionResourceOnlineOfflineStateModelFactory(); + private static final FakeMinionResourceOnlineOfflineStateModel STATE_MODEL_INSTANCE = + new FakeMinionResourceOnlineOfflineStateModel(); + + private FakeMinionResourceOnlineOfflineStateModelFactory() { + } + + @Override + public StateModel createNewStateModel(String resourceName, String partitionName) { + return STATE_MODEL_INSTANCE; + } + + @SuppressWarnings("unused") + @StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE") + public static class FakeMinionResourceOnlineOfflineStateModel extends StateModel { + private static final Logger LOGGER = LoggerFactory.getLogger(FakeMinionResourceOnlineOfflineStateModel.class); + + private FakeMinionResourceOnlineOfflineStateModel() { + } + + @Transition(from = "OFFLINE", to = "ONLINE") + public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { + LOGGER.debug("onBecomeOnlineFromOffline(): {}", message); + } + + @Transition(from = "OFFLINE", to = "DROPPED") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + LOGGER.debug("onBecomeDroppedFromOffline(): {}", message); + } + + @Transition(from = "ONLINE", to = "OFFLINE") + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + LOGGER.debug("onBecomeOfflineFromOnline(): {}", message); + } + + @Transition(from = "ONLINE", to = "DROPPED") + public void onBecomeDroppedFromOnline(Message message, NotificationContext context) { + LOGGER.debug("onBecomeDroppedFromOnline(): {}", message); + } + + @Transition(from = "ERROR", to = "OFFLINE") + public void onBecomeOfflineFromError(Message message, NotificationContext context) { + LOGGER.debug("onBecomeOfflineFromError(): {}", message); + } + } + } + protected void stopFakeInstances() { for (HelixManager helixManager : _fakeInstanceHelixManagers) { helixManager.disconnect(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java new file mode 100644 index 0000000..363c795 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion; + +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.controller.helix.ControllerTest; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class MinionInstancesCleanupTaskTest extends ControllerTest { + @BeforeClass + public void setup() + throws Exception { + startZk(); + startController(); + } + + @Test + public void testMinionInstancesCleanupTask() + throws Exception { + MinionInstancesCleanupTask minionInstancesCleanupTask = _controllerStarter.getMinionInstancesCleanupTask(); + minionInstancesCleanupTask.runTask(); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); + addFakeMinionInstancesToAutoJoinHelixCluster(3); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); + stopFakeInstance("Minion_localhost_0"); + minionInstancesCleanupTask.runTask(); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 1); + stopFakeInstance("Minion_localhost_1"); + minionInstancesCleanupTask.runTask(); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 2); + stopFakeInstance("Minion_localhost_2"); + minionInstancesCleanupTask.runTask(); + Assert.assertEquals( + _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 3); + } + + @AfterClass + public void teardown() { + stopController(); + stopZk(); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java index 3298248..6b710dd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/tenant/TenantRole.java @@ -19,5 +19,5 @@ package org.apache.pinot.spi.config.tenant; public enum TenantRole { - SERVER, BROKER + SERVER, BROKER, MINION } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org