This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5550c24f1e Disable reingestion for Pauseless dedup (#15383) 5550c24f1e is described below commit 5550c24f1e92a40df904af27853f61b5a9098428 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Fri Mar 28 12:45:46 2025 +0530 Disable reingestion for Pauseless dedup (#15383) * Disable reingestion for Pauseless dedup * Emit a metric that tracks how many errored segments were detected * Add new metric for unrecoverable errors * Add comment on metric --------- Co-authored-by: KKCorps <kar...@startee.ai> --- .../pinot/common/metrics/ControllerGauge.java | 10 +++++++- .../realtime/PinotLLCRealtimeSegmentManager.java | 28 ++++++++++++++++++++-- .../RealtimeSegmentValidationManager.java | 3 ++- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index cd6215228c..6418aca8d1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -192,7 +192,15 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { UNTRACKED_SEGMENTS_COUNT("untrackedSegmentsCount", false), // Metric used to track errors during the periodic table retention management - RETENTION_MANAGER_ERROR("retentionManagerError", false); + RETENTION_MANAGER_ERROR("retentionManagerError", false), + + // Metric used to track when segments in error state are detected for pauseless table + PAUSELESS_SEGMENTS_IN_ERROR_COUNT("pauselessSegmentsInErrorCount", false), + + // Metric used to track when segments in error state are detected for pauseless table for which needs + // manual intervention for repair + PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT("pauselessSegmentsInUnrecoverableErrorCount", false); + private final String _gaugeName; private final String _unit; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 84afaf6fdf..9e8bff5f5d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -108,6 +108,7 @@ import org.apache.pinot.spi.config.table.PauseState; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; @@ -2359,9 +2360,10 @@ public class PinotLLCRealtimeSegmentManager { * Request body (JSON): * * If segment is in ERROR state in only few replicas but has download URL, we instead trigger a segment reset - * @param realtimeTableName The table name with type, e.g. "myTable_REALTIME" + * @param tableConfig The table config */ - public void repairSegmentsInErrorStateForPauselessConsumption(String realtimeTableName) { + public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableConfig) { + String realtimeTableName = tableConfig.getTableName(); // Fetch ideal state and external view IdealState idealState = getIdealState(realtimeTableName); ExternalView externalView = _helixResourceManager.getTableExternalView(realtimeTableName); @@ -2425,7 +2427,12 @@ public class PinotLLCRealtimeSegmentManager { } } + if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) { + _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, + ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0); + _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, + ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0); return; } @@ -2434,6 +2441,23 @@ public class PinotLLCRealtimeSegmentManager { segmentsInErrorStateInAtLeastOneReplica.size(), segmentsInErrorStateInAtLeastOneReplica, segmentsInErrorStateInAllReplicas.size(), segmentsInErrorStateInAllReplicas, realtimeTableName); + boolean isPartialUpsertEnabled = + tableConfig.getUpsertConfig() != null && tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL; + boolean isDedupEnabled = tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled(); + if ((isPartialUpsertEnabled || isDedupEnabled)) { + // We do not run reingestion for dedup and partial upsert tables in pauseless as it can + // lead to data inconsistencies + _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, + ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size()); + LOGGER.error("Skipping repair for errored segments in table: {} because dedup or partial upsert is enabled.", + realtimeTableName); + return; + } else { + _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, + ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size()); + } + + for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) { SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName); if (segmentZKMetadata == null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 9e9f6ba110..2fcb669335 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -127,7 +127,8 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea boolean isPauselessConsumptionEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig); if (isPauselessConsumptionEnabled) { - _llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig.getTableName()); + // For pauseless tables without dedup or partial upsert, repair segments in error state + _llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig); } else if (_segmentAutoResetOnErrorAtValidation) { // Reset for pauseless tables is already handled in repairSegmentsInErrorStateForPauselessConsumption method with // additional checks for pauseless consumption --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org