This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d30e28bed Add minion observability for segment upload/download 
failures (#10978)
2d30e28bed is described below

commit 2d30e28bedea29d264b0d7946ebb3f0e2cf1b5cc
Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
AuthorDate: Mon Jun 26 19:25:26 2023 -0700

    Add minion observability for segment upload/download failures (#10978)
    
    * Add minion observability for segment upload/download failures
    
    Currently, minions do not provide observability into upload or download
    failures. Added mechanism to log errors and bump metrics when either
    upload or download fails, so that operators can set alerts on these
    metrics to detect the inconsistent state quickly and remediate if possible.
    
    Issue #10973
    
    * Style fix
---
 .../apache/pinot/common/metrics/MinionMeter.java   |  4 ++-
 .../tasks/BaseSingleSegmentConversionExecutor.java | 31 +++++++++++++++++++---
 2 files changed, 30 insertions(+), 5 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
index 101a950389..376f86e55e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
@@ -29,7 +29,9 @@ public enum MinionMeter implements AbstractMetrics.Meter {
   NUMBER_TASKS_COMPLETED("tasks", false),
   NUMBER_TASKS_CANCELLED("tasks", false),
   NUMBER_TASKS_FAILED("tasks", false),
-  NUMBER_TASKS_FATAL_FAILED("tasks", false);
+  NUMBER_TASKS_FATAL_FAILED("tasks", false),
+  SEGMENT_UPLOAD_FAIL_COUNT("segments", false),
+  SEGMENT_DOWNLOAD_FAIL_COUNT("segments", false);
 
   private final String _meterName;
   private final String _unit;
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index bc440e0c5e..51c7f98543 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -32,13 +32,17 @@ import org.apache.http.HttpHeaders;
 import org.apache.http.NameValuePair;
 import org.apache.http.message.BasicHeader;
 import org.apache.http.message.BasicNameValuePair;
+import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.auth.AuthProviderUtils;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.metrics.MinionMeter;
+import org.apache.pinot.common.metrics.MinionMetrics;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObservers;
 import org.apache.pinot.minion.exception.TaskCancelledException;
@@ -60,6 +64,7 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
   // Tracking finer grained progress status.
   protected PinotTaskConfig _pinotTaskConfig;
   protected MinionEventObserver _eventObserver;
+  protected final MinionMetrics _minionMetrics = 
MinionContext.getInstance().getMinionMetrics();
 
   /**
    * Converts the segment based on the given task config and returns the 
conversion result.
@@ -101,7 +106,14 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
       _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading segment 
from: " + downloadURL);
       File tarredSegmentFile = new File(tempDataDir, "tarredSegment");
       LOGGER.info("Downloading segment from {} to {}", downloadURL, 
tarredSegmentFile.getAbsolutePath());
-      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadURL, 
tarredSegmentFile, crypterName);
+      try {
+        SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadURL, 
tarredSegmentFile, crypterName);
+      } catch (Exception e) {
+        _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
+        LOGGER.error("Segment download failed for {}, crypter:{}", 
downloadURL, crypterName, e);
+        _eventObserver.notifyTaskError(_pinotTaskConfig, e);
+        Utils.rethrowException(e);
+      }
 
       // Un-tar the segment file
       _eventObserver.notifyProgress(_pinotTaskConfig, "Decompressing segment 
from: " + downloadURL);
@@ -177,13 +189,24 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
 
       // Upload the tarred segment
       _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + 
segmentName);
-      SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters, 
tableNameWithType, segmentName, uploadURL,
-          convertedTarredSegmentFile);
+      boolean uploadSuccessful = true;
+      try {
+        SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters, 
tableNameWithType, segmentName,
+            uploadURL, convertedTarredSegmentFile);
+      } catch (Exception e) {
+        uploadSuccessful = false;
+        _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_UPLOAD_FAIL_COUNT, 1L);
+        LOGGER.error("Segment upload failed for segment {}, table {}", 
segmentName, tableNameWithType, e);
+        _eventObserver.notifyTaskError(_pinotTaskConfig, e);
+      }
       if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
         LOGGER.warn("Failed to delete tarred converted segment: {}", 
convertedTarredSegmentFile.getAbsolutePath());
       }
 
-      LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, 
tableNameWithType, segmentName);
+      if (uploadSuccessful) {
+        LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, 
tableNameWithType, segmentName);
+      }
+
       return segmentConversionResult;
     } finally {
       FileUtils.deleteQuietly(tempDataDir);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to