This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 98c39a5f71 Repurpose MinionInstancesCleanupTask to 
StaleInstancesCleanupTask to remove stale broker and server instances as well. 
(#10027)
98c39a5f71 is described below

commit 98c39a5f718b7d1f623cd4f856a720206773544d
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Wed Jan 4 21:47:42 2023 -0800

    Repurpose MinionInstancesCleanupTask to StaleInstancesCleanupTask to remove 
stale broker and server instances as well. (#10027)
---
 .../pinot/common/metrics/ControllerGauge.java      |   8 +-
 .../pinot/controller/BaseControllerStarter.java    |  14 +-
 .../apache/pinot/controller/ControllerConf.java    |  52 ++++++++
 .../core/cleanup/StaleInstancesCleanupTask.java    | 146 +++++++++++++++++++++
 .../core/minion/MinionInstancesCleanupTask.java    |  88 -------------
 .../StaleInstancesCleanupTaskStatelessTest.java    | 138 +++++++++++++++++++
 .../MinionInstancesCleanupTaskStatelessTest.java   |  82 ------------
 7 files changed, 350 insertions(+), 178 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index b44bd8e999..64ced0203c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -101,9 +101,15 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   // Number of Tasks Status
   TASK_STATUS("taskStatus", false),
 
-  // Number of dropped minion instances
+  // Number of dropped stale minion instances
   DROPPED_MINION_INSTANCES("droppedMinionInstances", true),
 
+  // Number of dropped stale broker instances
+  DROPPED_BROKER_INSTANCES("droppedBrokerInstances", true),
+
+  // Number of dropped stale server instances
+  DROPPED_SERVER_INSTANCES("droppedServerInstances", true),
+
   // Number of online minion instances
   ONLINE_MINION_INSTANCES("onlineMinionInstances", true),
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index f6ebb5006d..83db19eccf 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -76,7 +76,7 @@ import 
org.apache.pinot.controller.api.resources.InvalidControllerConfigExceptio
 import org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
 import org.apache.pinot.controller.helix.SegmentStatusChecker;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import 
org.apache.pinot.controller.helix.core.minion.MinionInstancesCleanupTask;
+import 
org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
@@ -164,7 +164,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected SegmentCompletionManager _segmentCompletionManager;
   protected LeadControllerManager _leadControllerManager;
   protected List<ServiceStatus.ServiceStatusCallback> 
_serviceStatusCallbackList;
-  protected MinionInstancesCleanupTask _minionInstancesCleanupTask;
+  protected StaleInstancesCleanupTask _staleInstancesCleanupTask;
   protected TaskMetricsEmitter _taskMetricsEmitter;
   protected MultiThreadedHttpConnectionManager _connectionManager;
 
@@ -295,8 +295,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     return _taskManager;
   }
 
-  public MinionInstancesCleanupTask getMinionInstancesCleanupTask() {
-    return _minionInstancesCleanupTask;
+  public StaleInstancesCleanupTask getStaleInstancesCleanupTask() {
+    return _staleInstancesCleanupTask;
   }
 
   @Override
@@ -687,9 +687,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _segmentRelocator = new SegmentRelocator(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
         _executorService, _connectionManager);
     periodicTasks.add(_segmentRelocator);
-    _minionInstancesCleanupTask =
-        new MinionInstancesCleanupTask(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics);
-    periodicTasks.add(_minionInstancesCleanupTask);
+    _staleInstancesCleanupTask =
+        new StaleInstancesCleanupTask(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics);
+    periodicTasks.add(_staleInstancesCleanupTask);
     _taskMetricsEmitter =
         new TaskMetricsEmitter(_helixResourceManager, 
_helixTaskResourceManager, _leadControllerManager, _config,
             _controllerMetrics);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index e24530ab4c..015b1e90de 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -129,16 +129,27 @@ public class ControllerConf extends PinotConfiguration {
     @Deprecated
     public static final String 
DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS =
         "controller.minion.instances.cleanup.task.frequencyInSeconds";
+    @Deprecated
     public static final String MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD =
         "controller.minion.instances.cleanup.task.frequencyPeriod";
+    @Deprecated
     public static final String 
MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS =
         "controller.minion.instances.cleanup.task.initialDelaySeconds";
     // Deprecated as of 0.8.0
     @Deprecated
     public static final String 
DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_SECONDS
 =
         
"controller.minion.instances.cleanup.task.minOfflineTimeBeforeDeletionSeconds";
+    @Deprecated
     public static final String 
MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD =
         
"controller.minion.instances.cleanup.task.minOfflineTimeBeforeDeletionPeriod";
+
+    public static final String STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD =
+        "controller.stale.instances.cleanup.task.frequencyPeriod";
+    public static final String 
STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS =
+        "controller.stale.instances.cleanup.task.initialDelaySeconds";
+    public static final String 
STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD =
+        
"controller.stale.instances.cleanup.task.minOfflineTimeBeforeDeletionPeriod";
+
     // Deprecated as of 0.8.0
     @Deprecated
     public static final String 
DEPRECATED_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS =
@@ -209,7 +220,9 @@ public class ControllerConf extends PinotConfiguration {
     private static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS 
= 5 * 60; // 5 minutes
     private static final int 
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
     private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; 
// Disabled
+    @Deprecated
     private static final int 
DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    @Deprecated
     private static final int 
DEFAULT_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_IN_SECONDS
 =
         60 * 60; // 1 Hour.
 
@@ -699,6 +712,7 @@ public class ControllerConf extends PinotConfiguration {
         Integer.toString(frequencyInSeconds));
   }
 
+  @Deprecated
   public int getMinionInstancesCleanupTaskFrequencyInSeconds() {
     return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD))
         .map(period -> (int) convertPeriodToSeconds(period)).orElseGet(
@@ -706,21 +720,25 @@ public class ControllerConf extends PinotConfiguration {
                 
ControllerPeriodicTasksConf.DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS));
   }
 
+  @Deprecated
   public void setMinionInstancesCleanupTaskFrequencyInSeconds(int 
frequencyInSeconds) {
     
setProperty(ControllerPeriodicTasksConf.DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS,
         Integer.toString(frequencyInSeconds));
   }
 
+  @Deprecated
   public long getMinionInstancesCleanupTaskInitialDelaySeconds() {
     return 
getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
         ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
   }
 
+  @Deprecated
   public void setMinionInstancesCleanupTaskInitialDelaySeconds(int 
initialDelaySeconds) {
     
setProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
         Integer.toString(initialDelaySeconds));
   }
 
+  @Deprecated
   public int 
getMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds() {
     return Optional.ofNullable(
         
getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD))
@@ -731,12 +749,46 @@ public class ControllerConf extends PinotConfiguration {
                 
DEFAULT_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_IN_SECONDS));
   }
 
+  @Deprecated
   public void 
setMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds(int 
maxOfflineTimeRangeInSeconds) {
     setProperty(
         
ControllerPeriodicTasksConf.DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_SECONDS,
         Integer.toString(maxOfflineTimeRangeInSeconds));
   }
 
+  public int getStaleInstancesCleanupTaskFrequencyInSeconds() {
+    return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD))
+        .map(period -> (int) convertPeriodToSeconds(period))
+        // Backward compatible for existing users who configured 
MinionInstancesCleanupTask
+        .orElse(getMinionInstancesCleanupTaskFrequencyInSeconds());
+  }
+
+  public void setStaleInstanceCleanupTaskFrequencyInSeconds(String 
frequencyPeriod) {
+    
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD,
 frequencyPeriod);
+  }
+
+  public long getStaleInstanceCleanupTaskInitialDelaySeconds() {
+    return 
getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
+        // Backward compatible for existing users who configured 
MinionInstancesCleanupTask
+        getMinionInstancesCleanupTaskInitialDelaySeconds());
+  }
+
+  public void setStaleInstanceCleanupTaskInitialDelaySeconds(long 
initialDelaySeconds) {
+    
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
 initialDelaySeconds);
+  }
+
+  public int getStaleInstancesCleanupTaskInstancesRetentionInSeconds() {
+    return Optional.ofNullable(
+            
getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD))
+        .map(period -> (int) convertPeriodToSeconds(period))
+        // Backward compatible for existing users who configured 
MinionInstancesCleanupTask
+        
.orElse(getMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds());
+  }
+
+  public void setStaleInstancesCleanupTaskInstancesRetentionPeriod(String 
retentionPeriod) {
+    
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD,
 retentionPeriod);
+  }
+
   public int getDefaultTableMinReplicas() {
     return getProperty(TABLE_MIN_REPLICAS, DEFAULT_TABLE_MIN_REPLICAS);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
new file mode 100644
index 0000000000..b257462985
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.cleanup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Automatically removes stale instances from the cluster to not spam Helix.
+ * Stale instance is the instance not in use (not hosting any data or query) 
and has been in the offline status for more
+ * than the stale instance retention time.
+ */
+public class StaleInstancesCleanupTask extends BasePeriodicTask {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StaleInstancesCleanupTask.class);
+  private final static String TASK_NAME = "StaleInstancesCleanupTask";
+
+  protected final PinotHelixResourceManager _pinotHelixResourceManager;
+  protected final LeadControllerManager _leadControllerManager;
+  protected final ControllerMetrics _controllerMetrics;
+  // This applies to both broker and server instances.
+  private final long 
_staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds;
+
+  public StaleInstancesCleanupTask(PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf 
controllerConf, ControllerMetrics controllerMetrics) {
+    super(TASK_NAME, 
controllerConf.getStaleInstancesCleanupTaskFrequencyInSeconds(),
+        controllerConf.getStaleInstanceCleanupTaskInitialDelaySeconds());
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _leadControllerManager = leadControllerManager;
+    _controllerMetrics = controllerMetrics;
+    _staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds =
+        
controllerConf.getStaleInstancesCleanupTaskInstancesRetentionInSeconds() * 
1000L;
+  }
+
+  @Override
+  protected void runTask(Properties periodicTaskProperties) {
+    // Make it so that only one controller is responsible for cleaning up 
minion instances.
+    if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
+      return;
+    }
+
+    List<String> offlineInstances = new 
ArrayList<>(_pinotHelixResourceManager.getAllInstances());
+    
offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
+
+    Set<String> serverInstancesInUse = getServerInstancesInUse();
+    Set<String> brokerInstancesInUse = getBrokerInstancesInUse();
+
+    for (String offlineInstance : offlineInstances) {
+      // Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK 
session expire (e.g. due to network issue),
+      // the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race 
condition can happen when this task is
+      // running.
+      // In order to double confirm the live status of an instance, the field 
"LAST_OFFLINE_TIME" in ZNode under
+      // "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value 
is "-1", that means the instance is
+      // ONLINE;
+      // if the value is a timestamp, that means the instance starts to be 
OFFLINE since that time.
+      if (InstanceTypeUtils.isMinion(offlineInstance)) {
+        // Drop the minion instance if it has been offline for more than a 
period of this task.
+        if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance,
+            
_staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) {
+          LOGGER.info("Dropping minion instance: {}", offlineInstance);
+          PinotResourceManagerResponse response = 
_pinotHelixResourceManager.dropInstance(offlineInstance);
+          if (response.isSuccessful()) {
+            
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES,
 1);
+          }
+        }
+        continue;
+      }
+
+      // Drop the broker instance if it has been offline for more than a 
period of this task.
+      if (InstanceTypeUtils.isBroker(offlineInstance) && 
!brokerInstancesInUse.contains(offlineInstance)) {
+        if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance,
+            
_staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) {
+          LOGGER.info("Dropping broker instance: {}", offlineInstance);
+          PinotResourceManagerResponse response = 
_pinotHelixResourceManager.dropInstance(offlineInstance);
+          if (response.isSuccessful()) {
+            
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES,
 1);
+          }
+        }
+        continue;
+      }
+
+      // Drop the server instance if it has been offline for more than a 
period of this task.
+      if (InstanceTypeUtils.isServer(offlineInstance) && 
!serverInstancesInUse.contains(offlineInstance)) {
+        if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance,
+            
_staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) {
+          LOGGER.info("Dropping server instance: {}", offlineInstance);
+          PinotResourceManagerResponse response = 
_pinotHelixResourceManager.dropInstance(offlineInstance);
+          if (response.isSuccessful()) {
+            
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES,
 1);
+          }
+        }
+      }
+    }
+  }
+
+  private Set<String> getBrokerInstancesInUse() {
+    Set<String> brokerInstancesInUse = new HashSet<>();
+    final IdealState brokerResource = 
_pinotHelixResourceManager.getHelixAdmin()
+        
.getResourceIdealState(_pinotHelixResourceManager.getHelixClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    brokerResource.getPartitionSet()
+        .forEach(table -> 
brokerInstancesInUse.addAll(brokerResource.getInstanceSet(table)));
+    return brokerInstancesInUse;
+  }
+
+  private Set<String> getServerInstancesInUse() {
+    Set<String> serverInstancesInUse = new HashSet<>();
+    _pinotHelixResourceManager.getAllTables().forEach(tableName -> 
serverInstancesInUse.addAll(
+        
Optional.ofNullable(_pinotHelixResourceManager.getTableIdealState(tableName))
+            .map(is -> 
is.getInstanceSet(tableName)).orElse(Collections.emptySet())));
+    return serverInstancesInUse;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
deleted file mode 100644
index 09392c6b00..0000000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.minion;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.LeadControllerManager;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import org.apache.pinot.core.periodictask.BasePeriodicTask;
-import org.apache.pinot.spi.utils.InstanceTypeUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A periodic task to clean up offline Minion instances to not spam Helix.
- */
-public class MinionInstancesCleanupTask extends BasePeriodicTask {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionInstancesCleanupTask.class);
-  private final static String TASK_NAME = "MinionInstancesCleanupTask";
-  protected final PinotHelixResourceManager _pinotHelixResourceManager;
-  protected final LeadControllerManager _leadControllerManager;
-  protected final ControllerMetrics _controllerMetrics;
-  private final long 
_minionInstanceCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds;
-
-  public MinionInstancesCleanupTask(PinotHelixResourceManager 
pinotHelixResourceManager,
-      LeadControllerManager leadControllerManager, ControllerConf 
controllerConf, ControllerMetrics controllerMetrics) {
-    super(TASK_NAME, 
controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(),
-        controllerConf.getMinionInstancesCleanupTaskInitialDelaySeconds());
-    _pinotHelixResourceManager = pinotHelixResourceManager;
-    _leadControllerManager = leadControllerManager;
-    _controllerMetrics = controllerMetrics;
-    _minionInstanceCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds =
-        
controllerConf.getMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds()
 * 1000L;
-  }
-
-  @Override
-  protected void runTask(Properties periodicTaskProperties) {
-    // Make it so that only one controller is responsible for cleaning up 
minion instances.
-    if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
-      return;
-    }
-
-    List<String> offlineInstances = new 
ArrayList<>(_pinotHelixResourceManager.getAllInstances());
-    
offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
-    for (String offlineInstance : offlineInstances) {
-      // Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK 
session expire (e.g. due to network issue),
-      // the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race 
condition can happen when this task is
-      // running.
-      // In order to double confirm the live status of an instance, the field 
"LAST_OFFLINE_TIME" in ZNode under
-      // "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value 
is "-1", that means the instance is
-      // ONLINE;
-      // if the value is a timestamp, that means the instance starts to be 
OFFLINE since that time.
-      if (InstanceTypeUtils.isMinion(offlineInstance)) {
-        // Drop the minion instance if it has been offline for more than a 
period of this task.
-        if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance,
-            
_minionInstanceCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) {
-          LOGGER.info("Dropping minion instance: {}", offlineInstance);
-          PinotResourceManagerResponse response = 
_pinotHelixResourceManager.dropInstance(offlineInstance);
-          if (response.isSuccessful()) {
-            
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES,
 1);
-          }
-        }
-      }
-    }
-  }
-}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java
new file mode 100644
index 0000000000..4bc0b4cd10
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.cleanup;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+@Test(groups = "stateless")
+public class StaleInstancesCleanupTaskStatelessTest extends ControllerTest {
+  @BeforeClass
+  public void setup()
+      throws Exception {
+    startZk();
+    startController();
+  }
+
+  @Test
+  public void testStaleInstancesCleanupTaskForBrokers()
+      throws Exception {
+    StaleInstancesCleanupTask staleInstancesCleanupTask = 
_controllerStarter.getStaleInstancesCleanupTask();
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
 0);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(3, true);
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
 0);
+    stopFakeInstance("Broker_localhost_0");
+    Thread.sleep(1000);
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
 1);
+    stopFakeInstance("Broker_localhost_1");
+    Thread.sleep(1000);
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
 2);
+    stopFakeInstance("Broker_localhost_2");
+    Thread.sleep(1000);
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
 3);
+  }
+
+  @Test
+  public void testStaleInstancesCleanupTaskForServers()
+      throws Exception {
+    StaleInstancesCleanupTask staleInstancesCleanupTask = 
_controllerStarter.getStaleInstancesCleanupTask();
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
 0);
+    addFakeServerInstancesToAutoJoinHelixCluster(3, true);
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
 0);
+    stopFakeInstance("Server_localhost_0");
+    Thread.sleep(1000);
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
 1);
+    stopFakeInstance("Server_localhost_1");
+    Thread.sleep(1000);
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
 2);
+    stopFakeInstance("Server_localhost_2");
+    Thread.sleep(1000);
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
 3);
+  }
+
+  @Test
+  public void testStaleInstancesCleanupTaskForMinions()
+      throws Exception {
+    StaleInstancesCleanupTask staleInstancesCleanupTask = 
_controllerStarter.getStaleInstancesCleanupTask();
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 0);
+    addFakeMinionInstancesToAutoJoinHelixCluster(3);
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 0);
+    stopFakeInstance("Minion_localhost_0");
+    Thread.sleep(1000);
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 1);
+    stopFakeInstance("Minion_localhost_1");
+    Thread.sleep(1000);
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 2);
+    stopFakeInstance("Minion_localhost_2");
+    Thread.sleep(1000);
+    staleInstancesCleanupTask.runTask(new Properties());
+    Assert.assertEquals(
+        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 3);
+  }
+
+  @Override
+  public Map<String, Object> getDefaultControllerConfiguration() {
+    Map<String, Object> properties = super.getDefaultControllerConfiguration();
+    // Override the cleanup before deletion period so that test can avoid 
stuck failure
+    properties.put(ControllerConf.ControllerPeriodicTasksConf.
+        MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD, 
"1s");
+    properties.put(ControllerConf.ControllerPeriodicTasksConf.
+        STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD, "1s");
+    return properties;
+  }
+
+  @AfterClass
+  public void teardown() {
+    stopController();
+    stopZk();
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskStatelessTest.java
deleted file mode 100644
index 950f7c5492..0000000000
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskStatelessTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.minion;
-
-import java.util.Map;
-import java.util.Properties;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerTest;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-@Test(groups = "stateless")
-public class MinionInstancesCleanupTaskStatelessTest extends ControllerTest {
-  @BeforeClass
-  public void setup()
-      throws Exception {
-    startZk();
-    startController();
-  }
-
-  @Test
-  public void testMinionInstancesCleanupTask()
-      throws Exception {
-    MinionInstancesCleanupTask minionInstancesCleanupTask = 
_controllerStarter.getMinionInstancesCleanupTask();
-    minionInstancesCleanupTask.runTask(new Properties());
-    Assert.assertEquals(
-        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 0);
-    addFakeMinionInstancesToAutoJoinHelixCluster(3);
-    Assert.assertEquals(
-        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 0);
-    stopFakeInstance("Minion_localhost_0");
-    Thread.sleep(1000);
-    minionInstancesCleanupTask.runTask(new Properties());
-    Assert.assertEquals(
-        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 1);
-    stopFakeInstance("Minion_localhost_1");
-    Thread.sleep(1000);
-    minionInstancesCleanupTask.runTask(new Properties());
-    Assert.assertEquals(
-        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 2);
-    stopFakeInstance("Minion_localhost_2");
-    Thread.sleep(1000);
-    minionInstancesCleanupTask.runTask(new Properties());
-    Assert.assertEquals(
-        
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 3);
-  }
-
-  @Override
-  public Map<String, Object> getDefaultControllerConfiguration() {
-    Map<String, Object> properties = super.getDefaultControllerConfiguration();
-    // Override the cleanup before deletion period so that test can avoid 
stuck failure
-    properties.put(ControllerConf.ControllerPeriodicTasksConf.
-        MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD, 
"1s");
-    return properties;
-  }
-
-  @AfterClass
-  public void teardown() {
-    stopController();
-    stopZk();
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to