This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5550c24f1e Disable reingestion for Pauseless dedup (#15383)
5550c24f1e is described below

commit 5550c24f1e92a40df904af27853f61b5a9098428
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Fri Mar 28 12:45:46 2025 +0530

    Disable reingestion for Pauseless dedup (#15383)
    
    * Disable reingestion for Pauseless dedup
    
    * Emit a metric that tracks how many errored segments were detected
    
    * Add new metric for unrecoverable errors
    
    * Add comment on metric
    
    ---------
    
    Co-authored-by: KKCorps <kar...@startee.ai>
---
 .../pinot/common/metrics/ControllerGauge.java      | 10 +++++++-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 28 ++++++++++++++++++++--
 .../RealtimeSegmentValidationManager.java          |  3 ++-
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index cd6215228c..6418aca8d1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -192,7 +192,15 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   UNTRACKED_SEGMENTS_COUNT("untrackedSegmentsCount", false),
 
   // Metric used to track errors during the periodic table retention management
-  RETENTION_MANAGER_ERROR("retentionManagerError", false);
+  RETENTION_MANAGER_ERROR("retentionManagerError", false),
+
+  // Metric used to track when segments in error state are detected for 
pauseless table
+  PAUSELESS_SEGMENTS_IN_ERROR_COUNT("pauselessSegmentsInErrorCount", false),
+
+  // Metric used to track when segments in error state are detected for 
pauseless table for which needs
+  // manual intervention for repair
+  
PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT("pauselessSegmentsInUnrecoverableErrorCount",
 false);
+
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 84afaf6fdf..9e8bff5f5d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -108,6 +108,7 @@ import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -2359,9 +2360,10 @@ public class PinotLLCRealtimeSegmentManager {
    *   Request body (JSON):
    *
    * If segment is in ERROR state in only few replicas but has download URL, 
we instead trigger a segment reset
-   * @param realtimeTableName The table name with type, e.g. "myTable_REALTIME"
+   * @param tableConfig The table config
    */
-  public void repairSegmentsInErrorStateForPauselessConsumption(String 
realtimeTableName) {
+  public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig 
tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
     // Fetch ideal state and external view
     IdealState idealState = getIdealState(realtimeTableName);
     ExternalView externalView = 
_helixResourceManager.getTableExternalView(realtimeTableName);
@@ -2425,7 +2427,12 @@ public class PinotLLCRealtimeSegmentManager {
       }
     }
 
+
     if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) {
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+          ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+          ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0);
       return;
     }
 
@@ -2434,6 +2441,23 @@ public class PinotLLCRealtimeSegmentManager {
         segmentsInErrorStateInAtLeastOneReplica.size(), 
segmentsInErrorStateInAtLeastOneReplica,
         segmentsInErrorStateInAllReplicas.size(), 
segmentsInErrorStateInAllReplicas, realtimeTableName);
 
+    boolean isPartialUpsertEnabled =
+        tableConfig.getUpsertConfig() != null && 
tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL;
+    boolean isDedupEnabled = tableConfig.getDedupConfig() != null && 
tableConfig.getDedupConfig().isDedupEnabled();
+    if ((isPartialUpsertEnabled || isDedupEnabled)) {
+      // We do not run reingestion for dedup and partial upsert tables in 
pauseless as it can
+      // lead to data inconsistencies
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+          ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 
segmentsInErrorStateInAllReplicas.size());
+      LOGGER.error("Skipping repair for errored segments in table: {} because 
dedup or partial upsert is enabled.",
+          realtimeTableName);
+      return;
+    } else {
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+          ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 
segmentsInErrorStateInAllReplicas.size());
+    }
+
+
     for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
       SegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName);
       if (segmentZKMetadata == null) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 9e9f6ba110..2fcb669335 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -127,7 +127,8 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
     boolean isPauselessConsumptionEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
     if (isPauselessConsumptionEnabled) {
-      
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig.getTableName());
+      // For pauseless tables without dedup or partial upsert, repair segments 
in error state
+      
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig);
     } else if (_segmentAutoResetOnErrorAtValidation) {
       // Reset for pauseless tables is already handled in 
repairSegmentsInErrorStateForPauselessConsumption method with
       // additional checks for pauseless consumption


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to