This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-MinionInstancesCleanupTask
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 70907db03ac722b47873882c66558416b051a051
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Jul 2 11:27:14 2021 -0700

    Fix race condition in MinionInstancesCleanupTask
---
 .../pinot/controller/BaseControllerStarter.java    |  2 +-
 .../helix/core/PinotHelixResourceManager.java      | 29 +++++++++++++++++
 .../core/minion/MinionInstancesCleanupTask.java    | 37 ++++++++++++++++++----
 3 files changed, 61 insertions(+), 7 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 5dcbd65..68d73d7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -612,7 +612,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _segmentRelocator = new SegmentRelocator(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
         _executorService);
     periodicTasks.add(_segmentRelocator);
-    _minionInstancesCleanupTask = new 
MinionInstancesCleanupTask(_helixResourceManager, _config, _controllerMetrics);
+    _minionInstancesCleanupTask = new 
MinionInstancesCleanupTask(_helixResourceManager, _leadControllerManager, 
_config, _controllerMetrics);
     periodicTasks.add(_minionInstancesCleanupTask);
 
     return periodicTasks;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index bb01ff1..5397e53 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -66,6 +66,7 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
@@ -462,6 +463,34 @@ public class PinotHelixResourceManager {
   }
 
   /**
+   * Validates whether an instance is offline.
+   * Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK 
session expire (e.g. due to network issue),
+   * the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race 
condition can happen when this task is running.
+   * In order to double confirm the live status of an instance, the field 
"LAST_OFFLINE_TIME" in ZNode under
+   * "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value is 
"-1", that means the instance is ONLINE;
+   * if the value is a timestamp, that means the instance starts to be OFFLINE 
since that time.
+   * @param instanceId instance id
+   * @param offlineTimeRangeMs the time range that it's valid for an instance 
to be offline
+   */
+  public boolean isInstanceOffline(String instanceId, long offlineTimeRangeMs) 
{
+    // Check if the instance is included in /LIVEINSTANCES
+    if (_helixDataAccessor.getProperty(_keyBuilder.liveInstance(instanceId)) 
!= null) {
+      return false;
+    }
+    ParticipantHistory participantHistory = 
_helixDataAccessor.getProperty(_keyBuilder.participantHistory(instanceId));
+    long lastOfflineTime = participantHistory.getLastOfflineTime();
+    if (lastOfflineTime == -1) {
+      return false;
+    }
+    if (System.currentTimeMillis() - lastOfflineTime > offlineTimeRangeMs) {
+      LOGGER.info("Instance: {} has been offline for more than {}ms", 
instanceId, offlineTimeRangeMs);
+      return true;
+    }
+    // Still within the offline time range (e.g. due to zk session expire).
+    return false;
+  }
+
+  /**
    * Tenant related APIs
    */
   // TODO: move tenant related APIs here
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
index 87fd80b..838b652 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
@@ -23,36 +23,61 @@ import java.util.List;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * A periodic task to clean up offline Minion instances to not spam Helix.
  */
 public class MinionInstancesCleanupTask extends BasePeriodicTask {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionInstancesCleanupTask.class);
+  private final static String TASK_NAME = "MinionInstancesCleanupTask";
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
+  protected final LeadControllerManager _leadControllerManager;
   protected final ControllerMetrics _controllerMetrics;
+  private final long _minionInstanceCleanupTaskFrequencyInMilliseconds;
 
-  public MinionInstancesCleanupTask(PinotHelixResourceManager 
pinotHelixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
-    super("MinionInstancesCleanupTask", 
controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(),
+  public MinionInstancesCleanupTask(PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf 
controllerConf, ControllerMetrics controllerMetrics) {
+    super(TASK_NAME, 
controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(),
         controllerConf.getMinionInstancesCleanupTaskInitialDelaySeconds());
     _pinotHelixResourceManager = pinotHelixResourceManager;
+    _leadControllerManager = leadControllerManager;
     _controllerMetrics = controllerMetrics;
+    _minionInstanceCleanupTaskFrequencyInMilliseconds =
+        controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds() * 
1000L;
   }
 
   @Override
   protected void runTask() {
+    // Make it so that only one controller returns the metric for all the 
tasks.
+    if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
+      return;
+    }
+
     List<String> offlineInstances = new 
ArrayList<>(_pinotHelixResourceManager.getAllInstances());
     
offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
     for (String offlineInstance : offlineInstances) {
+      // Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK 
session expire (e.g. due to network issue),
+      // the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race 
condition can happen when this task is running.
+      // In order to double confirm the live status of an instance, the field 
"LAST_OFFLINE_TIME" in ZNode under
+      // "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value 
is "-1", that means the instance is ONLINE;
+      // if the value is a timestamp, that means the instance starts to be 
OFFLINE since that time.
       if 
(offlineInstance.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
-        PinotResourceManagerResponse response = 
_pinotHelixResourceManager.dropInstance(offlineInstance);
-        if (response.isSuccessful()) {
-          
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES,
 1);
+        // Drop the minion instance if it has been offline for more than a 
period of this task.
+        if (_pinotHelixResourceManager
+            .isInstanceOffline(offlineInstance, 
_minionInstanceCleanupTaskFrequencyInMilliseconds)) {
+          LOGGER.info("Dropping minion instance: {}", offlineInstance);
+          PinotResourceManagerResponse response = 
_pinotHelixResourceManager.dropInstance(offlineInstance);
+          if (response.isSuccessful()) {
+            
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES,
 1);
+          }
         }
       }
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to