This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 894665cdd31 controller: reuse error-state repair for non-pauseless; 
delete redundant reingest method (#16811)
894665cdd31 is described below

commit 894665cdd319336c974c09b072f627521cfe1ff6
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Dec 3 22:54:47 2025 -0800

    controller: reuse error-state repair for non-pauseless; delete redundant 
reingest method (#16811)
    
    - Reuse repairSegmentsInErrorStateForPauselessConsumption() to repair 
errored
      segments for both pauseless and non-pauseless tables
    - Remove reingestCommittingSegmentsForPauselessDisabled() and route call 
sites
      to the shared repair flow
    - Keep a single reingestion entrypoint: reingestSegment(table, segment, 
instances)
      with simple predicates (hasOnlineInstance, maybeResetIfNotInProgress)
    - Update RealtimeSegmentValidationManager to always invoke the shared 
repair flow
      (and preserve optional auto-reset behavior)
    - Adjust RealtimeSegmentValidationManagerTest expectations accordingly
    
    Behavioral notes:
    - Only re-ingests COMMITTING LLC segments with start/end offsets and no 
download
      URL when all replicas are in ERROR and the segment is ONLINE in IdealState
    - Otherwise resets segments not in IN_PROGRESS to fetch from deep store/peer
    - Applies only to tables without dedup or partial upsert unless explicitly 
allowed
      via allowRepairOfErrorSegments()
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 76 +++++++++++++++------
 .../RealtimeSegmentValidationManager.java          | 23 ++++---
 .../RealtimeSegmentValidationManagerTest.java      | 79 +++++++++++++++++++---
 3 files changed, 137 insertions(+), 41 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9efcc895d64..22743696c48 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2421,8 +2421,7 @@ public class PinotLLCRealtimeSegmentManager {
       if (!invalidSegments.isEmpty()) {
         LOGGER.warn("Cannot commit segments that are not in CONSUMING state. 
All consuming segments: {}, "
                 + "provided segments to commit: {}. Ignoring all non-consuming 
segments, sampling 10: {}",
-            allConsumingSegments,
-            segmentsToCommitStr, 
invalidSegments.stream().limit(10).collect(Collectors.toSet()));
+            allConsumingSegments, segmentsToCommitStr, 
invalidSegments.stream().limit(10).collect(Collectors.toSet()));
       }
       return validSegmentsToCommit;
     }
@@ -2640,6 +2639,7 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   /**
+   * Repair segments in error state:
    * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with 
no peer copy on any server. This method
    * will call the server reingestSegment API on one of the alive servers that 
are supposed to host that segment
    * according to IdealState.
@@ -2648,18 +2648,17 @@ public class PinotLLCRealtimeSegmentManager {
    * <p>
    * If segment is in ERROR state in only few replicas but has download URL, 
we instead trigger a segment reset
    *
-   * @param tableConfig                         The table config
-   * @param segmentAutoResetOnErrorAtValidation flag to determine whether to 
reset the error segments or not
+   * @param tableConfig The table config
    */
-  public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig 
tableConfig,
-      boolean repairErrorSegmentsForPartialUpsertOrDedup, boolean 
segmentAutoResetOnErrorAtValidation) {
+  public void repairSegmentsInErrorState(TableConfig tableConfig,
+      boolean repairErrorSegmentsForPartialUpsertOrDedup) {
+    boolean isPauselessTable = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
     String realtimeTableName = tableConfig.getTableName();
     // Fetch ideal state and external view
     IdealState idealState = getIdealState(realtimeTableName);
     ExternalView externalView = 
_helixResourceManager.getTableExternalView(realtimeTableName);
     if (externalView == null) {
-      LOGGER.warn(
-          "External view not found for table: {}, skipping repairing segments 
in error state for pauseless consumption",
+      LOGGER.warn("External view not found for table: {}, skipping repairing 
segments in error state",
           realtimeTableName);
       return;
     }
@@ -2706,9 +2705,14 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) {
-      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
-      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
-          ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0);
+      if (isPauselessTable) {
+        _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
+            0);
+        _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+            ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 
0);
+      } else {
+        _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0);
+      }
       return;
     }
 
@@ -2717,12 +2721,26 @@ public class PinotLLCRealtimeSegmentManager {
         segmentsInErrorStateInAtLeastOneReplica.size(), 
segmentsInErrorStateInAtLeastOneReplica,
         segmentsInErrorStateInAllReplicas.size(), 
segmentsInErrorStateInAllReplicas, realtimeTableName);
 
-    _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
-        segmentsInErrorStateInAllReplicas.size());
-
     boolean repairCommittingSegments =
         
allowRepairOfCommittingSegments(repairErrorSegmentsForPartialUpsertOrDedup, 
tableConfig);
     int segmentsInUnRecoverableState = 0;
+    if (isPauselessTable && !repairCommittingSegments) {
+      // We do not run reingestion for dedup and partial upsert tables in 
pauseless as it can
+      // lead to data inconsistencies
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+          ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 
segmentsInErrorStateInAllReplicas.size());
+      return;
+    } else {
+      LOGGER.info("Repairing error segments in table: {}.", realtimeTableName);
+    }
+
+    if (isPauselessTable) {
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
+          segmentsInErrorStateInAllReplicas.size());
+    } else {
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE,
+          segmentsInErrorStateInAllReplicas.size());
+    }
 
     for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
       SegmentZKMetadata segmentZKMetadata = 
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segmentName);
@@ -2731,10 +2749,11 @@ public class PinotLLCRealtimeSegmentManager {
             realtimeTableName);
         continue;
       }
-      // We only consider segments that are in COMMITTING state for reingestion
-      if (segmentZKMetadata.getStatus() == Status.COMMITTING && 
segmentsInErrorStateInAllReplicas.contains(
-          segmentName)) {
 
+      Status status = segmentZKMetadata.getStatus();
+
+      // 1) For COMMITTING segments where all replicas are in ERROR, try 
re-ingestion (subject to policy)
+      if (status == Status.COMMITTING && 
segmentsInErrorStateInAllReplicas.contains(segmentName)) {
         if (!repairCommittingSegments) {
           segmentsInUnRecoverableState += 1;
           LOGGER.info(
@@ -2756,20 +2775,33 @@ public class PinotLLCRealtimeSegmentManager {
               realtimeTableName);
           continue;
         }
-
         try {
           triggerReingestion(aliveServer, segmentName);
           LOGGER.info("Successfully triggered re-ingestion for segment: {} on 
server: {}", segmentName, aliveServer);
         } catch (Exception e) {
-          LOGGER.error("Failed to call reingestSegment for segment: {} on 
server: {}", segmentName, aliveServer, e);
+          LOGGER.error("Failed to trigger re-ingestion for segment: {} on 
server: {}", segmentName, aliveServer, e);
         }
-      } else if (segmentAutoResetOnErrorAtValidation) {
+        continue;
+      }
+
+      // 2) For IN_PROGRESS segments where all replicas are in ERROR, issue a 
reset to allow ERROR->OFFLINE and retry
+      if (status == Status.IN_PROGRESS && 
segmentsInErrorStateInAllReplicas.contains(segmentName)) {
+        LOGGER.info("Resetting IN_PROGRESS segment: {} in table: {} since all 
replicas are in ERROR state.",
+            segmentName, realtimeTableName);
+        _helixResourceManager.resetSegment(realtimeTableName, segmentName, 
null);
+        continue;
+      }
+
+      // 3) For all other non-IN_PROGRESS statuses with any ERROR replica, 
trigger a reset to download/refresh
+      if (status != Status.IN_PROGRESS) {
         _helixResourceManager.resetSegment(realtimeTableName, segmentName, 
null);
       }
     }
 
-    _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
-        ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 
segmentsInUnRecoverableState);
+    if (isPauselessTable) {
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+          ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 
segmentsInUnRecoverableState);
+    }
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 6568c82d477..15bf560ef5a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -129,15 +129,18 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       LOGGER.info("Skipping segment-level validation for table: {}", 
tableConfig.getTableName());
     }
 
-    boolean isPauselessConsumptionEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
-    if (isPauselessConsumptionEnabled) {
-      // For pauseless tables without dedup or partial upsert, repair segments 
in error state
-      
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig,
-          context._repairErrorSegmentsForPartialUpsertOrDedup, 
_segmentAutoResetOnErrorAtValidation);
-    } else if (_segmentAutoResetOnErrorAtValidation) {
-      // Reset for pauseless tables is already handled in 
repairSegmentsInErrorStateForPauselessConsumption method with
-      // additional checks for pauseless consumption
-      _pinotHelixResourceManager.resetSegments(tableConfig.getTableName(), 
null, true);
+    boolean isPauselessTable = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+    if (isPauselessTable || _segmentAutoResetOnErrorAtValidation) {
+      // For realtime tables without dedup or partial upsert, repair segments 
in error state.
+      // When pauseless consumption is disabled, this behavior remains gated by
+      // _segmentAutoResetOnErrorAtValidation to preserve legacy semantics.
+      _llcRealtimeSegmentManager.repairSegmentsInErrorState(tableConfig,
+          context._repairErrorSegmentsForPartialUpsertOrDedup);
+    } else {
+      LOGGER.debug(
+          "Skipping segment error repair for table {} because pauseless 
consumption is disabled and "
+              + "_segmentAutoResetOnErrorAtValidation=false",
+          tableConfig.getTableName());
     }
   }
 
@@ -183,7 +186,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       // The table was previously paused due to exceeding resource 
utilization, but the current status cannot be
       // determined. To be safe, leave it as paused and once the status is 
available take the correct action
       LOGGER.warn("Resource utilization limit could not be determined for for 
table: {}, and it is paused, leave it as "
-              + "paused", tableNameWithType);
+          + "paused", tableNameWithType);
       return false;
     }
     _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
index 4c74c0bc1f3..fb511b1feed 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
@@ -23,8 +23,11 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
@@ -64,6 +67,52 @@ public class RealtimeSegmentValidationManagerTest {
             _llcRealtimeSegmentManager, null, _controllerMetrics, 
_storageQuotaChecker, _resourceUtilizationManager);
   }
 
+  @Test
+  public void testReingestionCalledWhenPauselessDisabled() {
+    String rawTable = "testTable";
+    String tableName = rawTable + "_REALTIME";
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTable)
+        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+
+    // Force shouldEnsureConsuming=false by simulating admin pause
+    
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+    when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName))
+        .thenReturn(new PauseStatusDetails(true, null, 
PauseState.ReasonCode.ADMINISTRATIVE, null, null));
+
+    _realtimeSegmentValidationManager.processTable(tableName, new 
RealtimeSegmentValidationManager.Context());
+
+    verify(_llcRealtimeSegmentManager, times(1))
+        .repairSegmentsInErrorState(eq(tableConfig), anyBoolean());
+  }
+
+  @Test
+  public void testNoReingestionWhenPauselessEnabled() {
+    String rawTable = "testTable";
+    String tableName = rawTable + "_REALTIME";
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTable)
+        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+
+    // Enable pauseless on ingestion config
+    org.apache.pinot.spi.config.table.ingestion.IngestionConfig 
ingestionConfig =
+        new org.apache.pinot.spi.config.table.ingestion.IngestionConfig();
+    org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig 
streamIngestionConfig =
+        new org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig(
+            
java.util.List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()));
+    streamIngestionConfig.setPauselessConsumptionEnabled(true);
+    ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+    tableConfig.setIngestionConfig(ingestionConfig);
+
+    // Force shouldEnsureConsuming=false by simulating admin pause (to avoid 
ensureAllPartitionsConsuming)
+    
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+    when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName))
+        .thenReturn(new PauseStatusDetails(true, null, 
PauseState.ReasonCode.ADMINISTRATIVE, null, null));
+
+    _realtimeSegmentValidationManager.processTable(tableName, new 
RealtimeSegmentValidationManager.Context());
+
+    verify(_llcRealtimeSegmentManager, times(1))
+        .repairSegmentsInErrorState(eq(tableConfig), anyBoolean());
+  }
+
   @AfterMethod
   public void tearDown()
       throws Exception {
@@ -77,21 +126,32 @@ public class RealtimeSegmentValidationManagerTest {
         {true, PauseState.ReasonCode.ADMINISTRATIVE, 
UtilizationChecker.CheckResult.PASS, false, false},
 
         // Resource utilization exceeded and pause state is updated, should 
return false
-        {false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 
UtilizationChecker.CheckResult.FAIL, false,
-            false},
+        {
+            false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 
UtilizationChecker.CheckResult.FAIL,
+            false,
+            false
+        },
 
         // Resource utilization is within limits but was previously paused due 
to resource utilization,
         // should return true
-        {true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 
UtilizationChecker.CheckResult.PASS, false,
-            true},
+        {
+            true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 
UtilizationChecker.CheckResult.PASS, false,
+            true
+        },
 
         // Resource utilization is STALE but was previously paused due to 
resource utilization, should return false
-        {true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 
UtilizationChecker.CheckResult.UNDETERMINED,
-            false, false},
+        {
+            true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+            UtilizationChecker.CheckResult.UNDETERMINED,
+            false, false
+        },
 
         // Resource utilization is STALE but was not previously paused due to 
resource utilization, should return true
-        {false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 
UtilizationChecker.CheckResult.UNDETERMINED,
-            false, true},
+        {
+            false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+            UtilizationChecker.CheckResult.UNDETERMINED,
+            false, true
+        },
 
         // Resource utilization is within limits but was previously paused due 
to storage quota exceeded,
         // should return false
@@ -101,7 +161,8 @@ public class RealtimeSegmentValidationManagerTest {
         {false, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, 
UtilizationChecker.CheckResult.PASS, true, false},
 
         // Storage quota within limits but was previously paused due to 
storage quota exceeded, should return true
-        {true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, 
UtilizationChecker.CheckResult.PASS, false, true}};
+        {true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, 
UtilizationChecker.CheckResult.PASS, false, true}
+    };
   }
 
   @Test(dataProvider = "testCases")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to