This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch fix-MinionInstancesCleanupTask in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 70907db03ac722b47873882c66558416b051a051 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Fri Jul 2 11:27:14 2021 -0700 Fix race condition in MinionInstancesCleanupTask --- .../pinot/controller/BaseControllerStarter.java | 2 +- .../helix/core/PinotHelixResourceManager.java | 29 +++++++++++++++++ .../core/minion/MinionInstancesCleanupTask.java | 37 ++++++++++++++++++---- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 5dcbd65..68d73d7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -612,7 +612,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); periodicTasks.add(_segmentRelocator); - _minionInstancesCleanupTask = new MinionInstancesCleanupTask(_helixResourceManager, _config, _controllerMetrics); + _minionInstancesCleanupTask = new MinionInstancesCleanupTask(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); periodicTasks.add(_minionInstancesCleanupTask); return periodicTasks; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index bb01ff1..5397e53 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -66,6 +66,7 @@ import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ParticipantHistory; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; @@ -462,6 +463,34 @@ public class PinotHelixResourceManager { } /** + * Validates whether an instance is offline. + * Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK session expire (e.g. due to network issue), + * the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race condition can happen when this task is running. + * In order to double confirm the live status of an instance, the field "LAST_OFFLINE_TIME" in ZNode under + * "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value is "-1", that means the instance is ONLINE; + * if the value is a timestamp, that means the instance starts to be OFFLINE since that time. + * @param instanceId instance id + * @param offlineTimeRangeMs the time range that it's valid for an instance to be offline + */ + public boolean isInstanceOffline(String instanceId, long offlineTimeRangeMs) { + // Check if the instance is included in /LIVEINSTANCES + if (_helixDataAccessor.getProperty(_keyBuilder.liveInstance(instanceId)) != null) { + return false; + } + ParticipantHistory participantHistory = _helixDataAccessor.getProperty(_keyBuilder.participantHistory(instanceId)); + long lastOfflineTime = participantHistory.getLastOfflineTime(); + if (lastOfflineTime == -1) { + return false; + } + if (System.currentTimeMillis() - lastOfflineTime > offlineTimeRangeMs) { + LOGGER.info("Instance: {} has been offline for more than {}ms", instanceId, offlineTimeRangeMs); + return true; + } + // Still within the offline time range (e.g. due to zk session expire). + return false; + } + + /** * Tenant related APIs */ // TODO: move tenant related APIs here diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java index 87fd80b..838b652 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java @@ -23,36 +23,61 @@ import java.util.List; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; import org.apache.pinot.core.periodictask.BasePeriodicTask; import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A periodic task to clean up offline Minion instances to not spam Helix. */ public class MinionInstancesCleanupTask extends BasePeriodicTask { + private static final Logger LOGGER = LoggerFactory.getLogger(MinionInstancesCleanupTask.class); + private final static String TASK_NAME = "MinionInstancesCleanupTask"; protected final PinotHelixResourceManager _pinotHelixResourceManager; + protected final LeadControllerManager _leadControllerManager; protected final ControllerMetrics _controllerMetrics; + private final long _minionInstanceCleanupTaskFrequencyInMilliseconds; - public MinionInstancesCleanupTask(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, - ControllerMetrics controllerMetrics) { - super("MinionInstancesCleanupTask", controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(), + public MinionInstancesCleanupTask(PinotHelixResourceManager pinotHelixResourceManager, + LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) { + super(TASK_NAME, controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(), controllerConf.getMinionInstancesCleanupTaskInitialDelaySeconds()); _pinotHelixResourceManager = pinotHelixResourceManager; + _leadControllerManager = leadControllerManager; _controllerMetrics = controllerMetrics; + _minionInstanceCleanupTaskFrequencyInMilliseconds = + controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds() * 1000L; } @Override protected void runTask() { + // Make it so that only one controller returns the metric for all the tasks. + if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) { + return; + } + List<String> offlineInstances = new ArrayList<>(_pinotHelixResourceManager.getAllInstances()); offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList()); for (String offlineInstance : offlineInstances) { + // Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK session expire (e.g. due to network issue), + // the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race condition can happen when this task is running. + // In order to double confirm the live status of an instance, the field "LAST_OFFLINE_TIME" in ZNode under + // "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value is "-1", that means the instance is ONLINE; + // if the value is a timestamp, that means the instance starts to be OFFLINE since that time. if (offlineInstance.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) { - PinotResourceManagerResponse response = _pinotHelixResourceManager.dropInstance(offlineInstance); - if (response.isSuccessful()) { - _controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES, 1); + // Drop the minion instance if it has been offline for more than a period of this task. + if (_pinotHelixResourceManager + .isInstanceOffline(offlineInstance, _minionInstanceCleanupTaskFrequencyInMilliseconds)) { + LOGGER.info("Dropping minion instance: {}", offlineInstance); + PinotResourceManagerResponse response = _pinotHelixResourceManager.dropInstance(offlineInstance); + if (response.isSuccessful()) { + _controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES, 1); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org