This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 91e929adf3 Allow RealtimeSegmentValidationManager to fix error 
segments for partial upsert and dedup tables. (#15987)
91e929adf3 is described below

commit 91e929adf382dc3705aa4a89718f99cecdde5874
Author: 9aman <35227405+9a...@users.noreply.github.com>
AuthorDate: Sat Jun 7 04:01:57 2025 +0530

    Allow RealtimeSegmentValidationManager to fix error segments for partial 
upsert and dedup tables. (#15987)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java     | 10 ++++++++--
 .../validation/RealtimeSegmentValidationManager.java | 20 +++++++++++++++++++-
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a7b04e2766..012ef8d2d5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2434,7 +2434,8 @@ public class PinotLLCRealtimeSegmentManager {
    * If segment is in ERROR state in only few replicas but has download URL, 
we instead trigger a segment reset
    * @param tableConfig The table config
    */
-  public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig 
tableConfig) {
+  public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig 
tableConfig,
+      boolean repairErrorSegmentsForPartialUpsertOrDedup) {
     String realtimeTableName = tableConfig.getTableName();
     // Fetch ideal state and external view
     IdealState idealState = getIdealState(realtimeTableName);
@@ -2516,7 +2517,7 @@ public class PinotLLCRealtimeSegmentManager {
     boolean isPartialUpsertEnabled =
         tableConfig.getUpsertConfig() != null && 
tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL;
     boolean isDedupEnabled = tableConfig.getDedupConfig() != null && 
tableConfig.getDedupConfig().isDedupEnabled();
-    if ((isPartialUpsertEnabled || isDedupEnabled)) {
+    if ((isPartialUpsertEnabled || isDedupEnabled) && 
!repairErrorSegmentsForPartialUpsertOrDedup) {
       // We do not run reingestion for dedup and partial upsert tables in 
pauseless as it can
       // lead to data inconsistencies
       _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
@@ -2525,6 +2526,11 @@ public class PinotLLCRealtimeSegmentManager {
           realtimeTableName);
       return;
     } else {
+      if ((isPartialUpsertEnabled || isDedupEnabled)) {
+        LOGGER.info(
+            "Repairing error segments in table: {} as 
repairErrorSegmentForPartialUpsertOrDedup is set to true",
+            realtimeTableName);
+      }
       _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
           ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 
segmentsInErrorStateInAllReplicas.size());
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 9fa236ecde..21d3d42802 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -67,6 +67,8 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
   public static final String OFFSET_CRITERIA = "offsetCriteria";
   public static final String RUN_SEGMENT_LEVEL_VALIDATION = 
"runSegmentLevelValidation";
+  public static final String REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP 
=
+      "repairErrorSegmentsForPartialUpsertOrDedup";
 
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -96,6 +98,8 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       context._runSegmentLevelValidation = true;
       _lastSegmentLevelValidationRunTimeMs = currentTimeMs;
     }
+    context._repairErrorSegmentsForPartialUpsertOrDedup =
+        
shouldRepairErrorSegmentsForPartialUpsertOrDedup(periodicTaskProperties);
     String offsetCriteriaStr = 
periodicTaskProperties.getProperty(OFFSET_CRITERIA);
     if (offsetCriteriaStr != null) {
       context._offsetCriteria = new 
OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr);
@@ -129,7 +133,8 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     boolean isPauselessConsumptionEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
     if (isPauselessConsumptionEnabled) {
       // For pauseless tables without dedup or partial upsert, repair segments 
in error state
-      
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig);
+      
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig,
+          context._repairErrorSegmentsForPartialUpsertOrDedup);
     } else if (_segmentAutoResetOnErrorAtValidation) {
       // Reset for pauseless tables is already handled in 
repairSegmentsInErrorStateForPauselessConsumption method with
       // additional checks for pauseless consumption
@@ -261,6 +266,18 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     return runValidation || timeThresholdMet;
   }
 
+  private boolean shouldRepairErrorSegmentsForPartialUpsertOrDedup(Properties 
periodicTaskProperties) {
+    return 
Optional.ofNullable(periodicTaskProperties.getProperty(REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP))
+        .map(value -> {
+          try {
+            return Boolean.parseBoolean(value);
+          } catch (Exception e) {
+            return false;
+          }
+        })
+        .orElse(false);
+  }
+
   @Override
   protected void nonLeaderCleanup(List<String> tableNamesWithType) {
     for (String tableNameWithType : tableNamesWithType) {
@@ -288,6 +305,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
   public static final class Context {
     private boolean _runSegmentLevelValidation;
+    private boolean _repairErrorSegmentsForPartialUpsertOrDedup;
     private OffsetCriteria _offsetCriteria;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to