This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2d30e28bed Add minion observability for segment upload/download failures (#10978) 2d30e28bed is described below commit 2d30e28bedea29d264b0d7946ebb3f0e2cf1b5cc Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com> AuthorDate: Mon Jun 26 19:25:26 2023 -0700 Add minion observability for segment upload/download failures (#10978) * Add minion observability for segment upload/download failures Currently, minions do not provide observability into upload or download failures. Added mechanism to log errors and bump metrics when either upload or download fails, so that operators can set alerts on these metrics to detect the inconsistent state quickly and remediate if possible. Issue #10973 * Style fix --- .../apache/pinot/common/metrics/MinionMeter.java | 4 ++- .../tasks/BaseSingleSegmentConversionExecutor.java | 31 +++++++++++++++++++--- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java index 101a950389..376f86e55e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java @@ -29,7 +29,9 @@ public enum MinionMeter implements AbstractMetrics.Meter { NUMBER_TASKS_COMPLETED("tasks", false), NUMBER_TASKS_CANCELLED("tasks", false), NUMBER_TASKS_FAILED("tasks", false), - NUMBER_TASKS_FATAL_FAILED("tasks", false); + NUMBER_TASKS_FATAL_FAILED("tasks", false), + SEGMENT_UPLOAD_FAIL_COUNT("segments", false), + SEGMENT_DOWNLOAD_FAIL_COUNT("segments", false); private final String _meterName; private final String _unit; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java index bc440e0c5e..51c7f98543 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java @@ -32,13 +32,17 @@ import org.apache.http.HttpHeaders; import org.apache.http.NameValuePair; import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicNameValuePair; +import org.apache.pinot.common.Utils; import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; +import org.apache.pinot.common.metrics.MinionMeter; +import org.apache.pinot.common.metrics.MinionMetrics; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.minion.MinionContext; import org.apache.pinot.minion.event.MinionEventObserver; import org.apache.pinot.minion.event.MinionEventObservers; import org.apache.pinot.minion.exception.TaskCancelledException; @@ -60,6 +64,7 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut // Tracking finer grained progress status. protected PinotTaskConfig _pinotTaskConfig; protected MinionEventObserver _eventObserver; + protected final MinionMetrics _minionMetrics = MinionContext.getInstance().getMinionMetrics(); /** * Converts the segment based on the given task config and returns the conversion result. @@ -101,7 +106,14 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading segment from: " + downloadURL); File tarredSegmentFile = new File(tempDataDir, "tarredSegment"); LOGGER.info("Downloading segment from {} to {}", downloadURL, tarredSegmentFile.getAbsolutePath()); - SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadURL, tarredSegmentFile, crypterName); + try { + SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadURL, tarredSegmentFile, crypterName); + } catch (Exception e) { + _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L); + LOGGER.error("Segment download failed for {}, crypter:{}", downloadURL, crypterName, e); + _eventObserver.notifyTaskError(_pinotTaskConfig, e); + Utils.rethrowException(e); + } // Un-tar the segment file _eventObserver.notifyProgress(_pinotTaskConfig, "Decompressing segment from: " + downloadURL); @@ -177,13 +189,24 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut // Upload the tarred segment _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + segmentName); - SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters, tableNameWithType, segmentName, uploadURL, - convertedTarredSegmentFile); + boolean uploadSuccessful = true; + try { + SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters, tableNameWithType, segmentName, + uploadURL, convertedTarredSegmentFile); + } catch (Exception e) { + uploadSuccessful = false; + _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_UPLOAD_FAIL_COUNT, 1L); + LOGGER.error("Segment upload failed for segment {}, table {}", segmentName, tableNameWithType, e); + _eventObserver.notifyTaskError(_pinotTaskConfig, e); + } if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); } - LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName); + if (uploadSuccessful) { + LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName); + } + return segmentConversionResult; } finally { FileUtils.deleteQuietly(tempDataDir); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org