klsince commented on code in PR #11740:
URL: https://github.com/apache/pinot/pull/11740#discussion_r1347844036


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java:
##########
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Periodic task to check whether a user triggered rebalance job is completed 
or not, and retry if failed. The retry
+ * job is started with the same rebalance configs provided by the user and 
does best effort to stop the other jobs
+ * for the same table. This task can be configured to just check failures and 
report metrics, and not to do retry.
+ */
+public class RebalanceChecker extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RebalanceChecker.class);
+  private final boolean _checkOnly;
+  private final ExecutorService _executorService;
+
+  public RebalanceChecker(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
+      ExecutorService executorService) {
+    super(RebalanceChecker.class.getSimpleName(), 
config.getRebalanceCheckerFrequencyInSeconds(),
+        config.getRebalanceCheckerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
+    _checkOnly = config.isRebalanceCheckerCheckOnly();
+    _executorService = executorService;
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    _executorService.submit(() -> {
+      try {
+        TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+        Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: {}", tableNameWithType);
+        retryRebalanceTable(tableNameWithType, tableConfig);
+      } catch (Throwable t) {
+        LOGGER.error("Failed to retry rebalance for table: {}", 
tableNameWithType, t);
+      }
+    });
+  }
+
+  @VisibleForTesting
+  void retryRebalanceTable(String tableNameWithType, TableConfig tableConfig)
+      throws Exception {
+    // Get all rebalance jobs as tracked in ZK for this table to check if 
there is any failure to retry.
+    Map<String, Map<String, String>> allJobMetadata = 
_pinotHelixResourceManager.getAllJobsForTable(tableNameWithType,
+        Collections.singleton(ControllerJobType.TABLE_REBALANCE));
+    if (allJobMetadata.isEmpty()) {
+      LOGGER.info("No rebalance job has been triggered for table: {}. Skip 
retry", tableNameWithType);
+      return;
+    }
+    // Skip retry for the table if rebalance job is still running or has 
completed, in specific:
+    // 1) Skip retry if any rebalance job is actively running. Being actively 
running means the job is at IN_PROGRESS
+    // status, and has updated its status kept in ZK within the heartbeat 
timeout. It's possible that more than one
+    // rebalance jobs are running for the table, but that's fine with 
idempotent rebalance algorithm.
+    // 2) Skip retry if the most recently started rebalance job has completed 
with DONE or NO_OP. It's possible that
+    // jobs started earlier may be still running, but they are ignored here.
+    //
+    // Otherwise, we can get a list of failed rebalance jobs, i.e. those at 
FAILED status; or IN_PROGRESS status but
+    // haven't updated their status kept in ZK within the heartbeat timeout. 
For those candidate jobs to retry:
+    // 1) Firstly, group them by the original jobIds they retry for so that we 
can skip those exceeded maxRetry.
+    // 2) For the remaining jobs, we take the one started most recently and 
retry it with its original configs.
+    // 3) If configured, we can abort the other rebalance jobs for the table 
by setting their status to FAILED.
+    Map<String/*original jobId*/, Set<Pair<TableRebalanceRetryConfig/*original 
or retry jobId*/, Long
+        /*job startTime*/>>> candidateJobs = 
getCandidateJobs(tableNameWithType, allJobMetadata);
+    if (candidateJobs.isEmpty()) {
+      LOGGER.info("Found no failed rebalance jobs for table: {}. Skip retry", 
tableNameWithType);
+      return;
+    }
+    _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.TABLE_REBALANCE_FAILURE_DETECTED, 1L);
+    if (_checkOnly) {
+      LOGGER.info("Found failed rebalance jobs: {} for table: {}, but check 
only", candidateJobs.keySet(),
+          tableNameWithType);
+      return;
+    }
+    TableRebalanceRetryConfig retryConfig = getLatestJob(candidateJobs);
+    if (retryConfig == null) {
+      LOGGER.info("Rebalance is retried enough times for table: {}. Skip 
retry", tableNameWithType);
+      return;
+    }
+    tryStopExistingJobs(tableNameWithType, _pinotHelixResourceManager);
+    String newRetryJobId = 
TableRebalancer.createUniqueRebalanceJobIdentifier();
+    // Reset the JOB_ID of rebalanceConfig to be the new retry job ID.
+    RebalanceConfig rebalanceConfig = retryConfig.getConfig();
+    rebalanceConfig.setJobId(newRetryJobId);
+    ZkBasedTableRebalanceObserver observer = new 
ZkBasedTableRebalanceObserver(tableNameWithType, newRetryJobId,
+        TableRebalanceRetryConfig.forRetryRun(retryConfig.getOriginalJobId(), 
rebalanceConfig,
+            retryConfig.getRetryNum() + 1), _pinotHelixResourceManager);
+    LOGGER.info("Retry original rebalance job: {} for table: {} with new retry 
job: {}", retryConfig.getOriginalJobId(),
+        tableNameWithType, newRetryJobId);
+    _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.TABLE_REBALANCE_RETRY, 1L);
+    RebalanceResult result =
+        _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
tableConfig, newRetryJobId, rebalanceConfig,
+            observer);
+    LOGGER.info("Retry job: {} for table: {} is done with result: {}", 
newRetryJobId, tableNameWithType, result);
+  }
+
+  private static void tryStopExistingJobs(String tableNameWithType,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    pinotHelixResourceManager.updateAllJobsForTable(tableNameWithType,
+        
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE),
+        jobMetadata -> updateJobMetadata(tableNameWithType, jobMetadata));
+  }
+
+  private static void updateJobMetadata(String tableNameWithType, Map<String, 
String> jobMetadata) {
+    String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+    try {
+      String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+      TableRebalanceProgressStats jobStats = 
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
+      LOGGER.info("Set status of rebalance job: {} for table: {} from: {} to: 
FAILED", jobId, tableNameWithType,
+          jobStats.getStatus());
+      jobStats.setStatus(RebalanceResult.Status.FAILED.toString());
+      
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+          JsonUtils.objectToString(jobStats));
+    } catch (Exception e) {
+      LOGGER.error("Failed to update status of rebalance job: {} for table: 
{}", jobId, tableNameWithType, e);
+    }
+  }
+
+  @VisibleForTesting
+  static TableRebalanceRetryConfig getLatestJob(Map<String, 
Set<Pair<TableRebalanceRetryConfig, Long>>> candidateJobs) {
+    Pair<TableRebalanceRetryConfig, Long> candidateJobRun = null;
+    for (Map.Entry<String, Set<Pair<TableRebalanceRetryConfig, Long>>> entry : 
candidateJobs.entrySet()) {
+      // The job configs from all retry jobs are same, as the same set of job 
configs is used to do retry.
+      // The job metadata kept in ZK is cleaned by submission time order 
gradually, so we can't compare Set.size()
+      // against maxRetry, but check retryNum of each run to see if retries 
have exceeded limit.
+      Set<Pair<TableRebalanceRetryConfig, Long>> jobRuns = entry.getValue();
+      int maxRetry = 
jobRuns.iterator().next().getLeft().getConfig().getMaxRetry();
+      Pair<TableRebalanceRetryConfig, Long> latestJobRun = null;
+      for (Pair<TableRebalanceRetryConfig, Long> jobRun : jobRuns) {
+        if (jobRun.getLeft().getRetryNum() >= maxRetry) {
+          latestJobRun = null;
+          break;
+        }
+        if (latestJobRun == null || latestJobRun.getRight() < 
jobRun.getRight()) {
+          latestJobRun = jobRun;
+        }
+      }
+      if (latestJobRun == null) {
+        LOGGER.info("Rebalance job: {} had exceeded maxRetry: {}. Skip retry", 
entry.getKey(), maxRetry);
+        continue;
+      }
+      if (candidateJobRun == null || candidateJobRun.getRight() < 
latestJobRun.getRight()) {
+        candidateJobRun = latestJobRun;
+      }
+    }
+    return candidateJobRun == null ? null : candidateJobRun.getLeft();
+  }
+
+  @VisibleForTesting
+  static Map<String, Set<Pair<TableRebalanceRetryConfig, Long>>> 
getCandidateJobs(String tableNameWithType,
+      Map<String, Map<String, String>> allJobMetadata)
+      throws Exception {
+    long nowMs = System.currentTimeMillis();
+    Map<String, Set<Pair<TableRebalanceRetryConfig, Long>>> candidates = new 
HashMap<>();
+    // If the job started most recently has already completed, then skip retry 
for the table.
+    Pair<String, Long> latestStartedJob = null;
+    Pair<String, Long> latestCompletedJob = null;
+    // The processing order of job metadata from the given Map is not 
deterministic. Track the completed original
+    // jobs so that we can simply skip the retry jobs belonging to the 
completed original jobs.
+    Map<String, String> completedOriginalJobs = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
allJobMetadata.entrySet()) {
+      String jobId = entry.getKey();
+      Map<String, String> jobMetadata = entry.getValue();
+      long statsUpdatedAt = 
Long.parseLong(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+      String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+      if (StringUtils.isEmpty(jobStatsInStr)) {
+        LOGGER.info("Skip rebalance job: {} as it has no job progress stats", 
jobId);
+        continue;
+      }
+      String retryCfgInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_RETRY_CONFIG);
+      if (StringUtils.isEmpty(retryCfgInStr)) {
+        LOGGER.info("Skip rebalance job: {} as it has no job retry configs", 
jobId);
+        continue;
+      }
+      TableRebalanceProgressStats progress = 
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
+      TableRebalanceRetryConfig retryConfig = 
JsonUtils.stringToObject(retryCfgInStr, TableRebalanceRetryConfig.class);
+      long jobStartTime = progress.getStartTimeMs();
+      if (latestStartedJob == null || latestStartedJob.getRight() < 
jobStartTime) {
+        latestStartedJob = Pair.of(jobId, jobStartTime);
+      }
+      String originalJobId = retryConfig.getOriginalJobId();
+      RebalanceResult.Status jobStatus = 
RebalanceResult.Status.valueOf(progress.getStatus());
+      if (jobStatus == RebalanceResult.Status.DONE || jobStatus == 
RebalanceResult.Status.NO_OP) {
+        LOGGER.info("Skip rebalance job: {} as it has completed with status: 
{}", jobId, jobStatus);
+        completedOriginalJobs.put(originalJobId, jobId);
+        if (latestCompletedJob == null || latestCompletedJob.getRight() < 
jobStartTime) {
+          latestCompletedJob = Pair.of(jobId, jobStartTime);
+        }
+        continue;
+      }
+      if (jobStatus == RebalanceResult.Status.FAILED) {
+        LOGGER.info("Found failed rebalance job: {} for original job: {}", 
jobId, originalJobId);
+        candidates.computeIfAbsent(originalJobId, (k) -> new 
HashSet<>()).add(Pair.of(retryConfig, jobStartTime));
+        continue;
+      }
+      // Check if an IN_PROGRESS job is still actively running.
+      long heartbeatTimeoutMs = 
retryConfig.getConfig().getHeartbeatTimeoutInMs();
+      if (nowMs - statsUpdatedAt < heartbeatTimeoutMs) {
+        LOGGER.info("Rebalance job: {} is actively running with status updated 
at: {} within timeout: {}. Skip "
+            + "retry for table: {}", jobId, statsUpdatedAt, 
heartbeatTimeoutMs, tableNameWithType);
+        return Collections.emptyMap();
+      }
+      // The job is considered failed, but it's possible it is still running, 
then we might end up with more than one
+      // rebalance jobs running in parallel for a table. The rebalance 
algorithm is idempotent, so this should be fine
+      // for the correctness.
+      LOGGER.info("Found stuck rebalance job: {} for original job: {}", jobId, 
originalJobId);
+      candidates.computeIfAbsent(originalJobId, (k) -> new 
HashSet<>()).add(Pair.of(retryConfig, jobStartTime));

Review Comment:
   yup, method tryStopExistingJobs() does this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to