This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e8caf52571 Resets in-progress pauseless table segments (#16904)
9e8caf52571 is described below

commit 9e8caf525710029d29848b38861124443edcd6c4
Author: NOOB <[email protected]>
AuthorDate: Tue Oct 7 07:11:15 2025 +0530

    Resets in-progress pauseless table segments (#16904)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 71 +++++++++++-----------
 .../helix/core/rebalance/TableRebalancer.java      |  2 +-
 .../RealtimeSegmentValidationManager.java          |  2 +-
 ...imeIngestionConsumingTransitionFailureTest.java | 45 ++++++++++++++
 ...imeIngestionConsumingTransitionFailureTest.java | 45 ++++++++++++++
 ...sRealtimeIngestionSegmentCommitFailureTest.java | 43 ++++++++-----
 ...FailureInjectingRealtimeSegmentDataManager.java |  6 +-
 .../FailureInjectingRealtimeTableDataManager.java  | 23 ++++---
 .../utils/FailureInjectingTableConfig.java         | 69 +++++++++++++++++++++
 .../FailureInjectingTableDataManagerProvider.java  | 24 +++++++-
 10 files changed, 265 insertions(+), 65 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8a3dccfaa3b..94f1404ea1b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2608,18 +2608,18 @@ public class PinotLLCRealtimeSegmentManager {
 
   /**
    * Re-ingests segments that are in ERROR state in EV but ONLINE in IS with 
no peer copy on any server. This method
-   * will call the server reingestSegment API
-   * on one of the alive servers that are supposed to host that segment 
according to IdealState.
-   *
-   * API signature:
-   *   POST http://[serverURL]/reingestSegment/[segmentName]
-   *   Request body (JSON):
-   *
+   * will call the server reingestSegment API on one of the alive servers that 
are supposed to host that segment
+   * according to IdealState.
+   * <p>
+   * API signature: POST http://[serverURL]/reingestSegment/[segmentName] 
Request body (JSON):
+   * <p>
    * If segment is in ERROR state in only few replicas but has download URL, 
we instead trigger a segment reset
-   * @param tableConfig The table config
+   *
+   * @param tableConfig                         The table config
+   * @param segmentAutoResetOnErrorAtValidation flag to determine whether to 
reset the error segments or not
    */
   public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig 
tableConfig,
-      boolean repairErrorSegmentsForPartialUpsertOrDedup) {
+      boolean repairErrorSegmentsForPartialUpsertOrDedup, boolean 
segmentAutoResetOnErrorAtValidation) {
     String realtimeTableName = tableConfig.getTableName();
     // Fetch ideal state and external view
     IdealState idealState = getIdealState(realtimeTableName);
@@ -2663,18 +2663,6 @@ public class PinotLLCRealtimeSegmentManager {
         continue;
       }
 
-      // Skip segments that are not ONLINE in ideal state
-      boolean hasOnlineInstance = false;
-      for (String state : idealStateMap.values()) {
-        if (SegmentStateModel.ONLINE.equals(state)) {
-          hasOnlineInstance = true;
-          break;
-        }
-      }
-      if (!hasOnlineInstance) {
-        continue;
-      }
-
       if (numReplicasInError > 0) {
         segmentsInErrorStateInAtLeastOneReplica.add(segmentName);
       }
@@ -2696,17 +2684,12 @@ public class PinotLLCRealtimeSegmentManager {
         segmentsInErrorStateInAtLeastOneReplica.size(), 
segmentsInErrorStateInAtLeastOneReplica,
         segmentsInErrorStateInAllReplicas.size(), 
segmentsInErrorStateInAllReplicas, realtimeTableName);
 
-    if 
(!allowRepairOfErrorSegments(repairErrorSegmentsForPartialUpsertOrDedup, 
tableConfig)) {
-      // We do not run reingestion for dedup and partial upsert tables in 
pauseless as it can
-      // lead to data inconsistencies
-      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
-          ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 
segmentsInErrorStateInAllReplicas.size());
-      return;
-    } else {
-      LOGGER.info("Repairing error segments in table: {}.", realtimeTableName);
-      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
-          segmentsInErrorStateInAllReplicas.size());
-    }
+    _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
+        segmentsInErrorStateInAllReplicas.size());
+
+    boolean repairCommittingSegments =
+        
allowRepairOfCommittingSegments(repairErrorSegmentsForPartialUpsertOrDedup, 
tableConfig);
+    int segmentsInUnRecoverableState = 0;
 
     for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
       SegmentZKMetadata segmentZKMetadata = 
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segmentName);
@@ -2718,6 +2701,16 @@ public class PinotLLCRealtimeSegmentManager {
       // We only consider segments that are in COMMITTING state for reingestion
       if (segmentZKMetadata.getStatus() == Status.COMMITTING && 
segmentsInErrorStateInAllReplicas.contains(
           segmentName)) {
+
+        if (!repairCommittingSegments) {
+          segmentsInUnRecoverableState += 1;
+          LOGGER.info(
+              "Segment: {} in table: {} is COMMITTING with all replicas in 
ERROR state. Skipping re-ingestion since "
+                  + "repairErrorSegments is false.",
+              segmentName, realtimeTableName);
+          continue;
+        }
+
         LOGGER.info("Segment: {} in table: {} is COMMITTING with all replicas 
in ERROR state. Triggering re-ingestion.",
             segmentName, realtimeTableName);
 
@@ -2737,20 +2730,24 @@ public class PinotLLCRealtimeSegmentManager {
         } catch (Exception e) {
           LOGGER.error("Failed to call reingestSegment for segment: {} on 
server: {}", segmentName, aliveServer, e);
         }
-      } else if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) {
-        // Trigger reset for segment not in IN_PROGRESS state to download the 
segment from deep store or peer server
+      } else if (segmentAutoResetOnErrorAtValidation) {
         _helixResourceManager.resetSegment(realtimeTableName, segmentName, 
null);
       }
     }
+
+    _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+        ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 
segmentsInUnRecoverableState);
   }
 
   /**
-   * Whether to allow repairing the ERROR segment or not
+   * Whether to allow repairing the ERROR segments with ZK status: COMMITTING
+   * This method is only useful for pauseless ingestion with features like 
dedup enabled (Since repairing committing
+   * segments for such tables can lead to incorrect data).
    * @param repairErrorSegmentsForPartialUpsertOrDedup API context flag, if 
true then always allow repair
    * @param tableConfig tableConfig
-   * @return Returns true if repair is allowed for ERROR segments or not
+   * @return Returns true if repair is allowed
    */
-  public boolean allowRepairOfErrorSegments(boolean 
repairErrorSegmentsForPartialUpsertOrDedup,
+  public boolean allowRepairOfCommittingSegments(boolean 
repairErrorSegmentsForPartialUpsertOrDedup,
       TableConfig tableConfig) {
     if (repairErrorSegmentsForPartialUpsertOrDedup) {
       // If API context has repairErrorSegmentsForPartialUpsertOrDedup=true, 
allow repair.
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 2edd6042025..be374dbed6c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -1977,7 +1977,7 @@ public class TableRebalancer {
       //       best to return that there is a risk of data loss for pauseless 
enabled tables for segments in COMMITTING
       //       state
       if (_isPauselessEnabled && segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.COMMITTING
-          && 
!_pinotLLCRealtimeSegmentManager.allowRepairOfErrorSegments(false, 
_tableConfig)) {
+          && 
!_pinotLLCRealtimeSegmentManager.allowRepairOfCommittingSegments(false, 
_tableConfig)) {
         return Pair.of(true, generateDataLossRiskMessage(segmentName, false));
       }
       return NO_DATA_LOSS_RISK_RESULT;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index cbe6a45621e..6568c82d477 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -133,7 +133,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     if (isPauselessConsumptionEnabled) {
       // For pauseless tables without dedup or partial upsert, repair segments 
in error state
       
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig,
-          context._repairErrorSegmentsForPartialUpsertOrDedup);
+          context._repairErrorSegmentsForPartialUpsertOrDedup, 
_segmentAutoResetOnErrorAtValidation);
     } else if (_segmentAutoResetOnErrorAtValidation) {
       // Reset for pauseless tables is already handled in 
repairSegmentsInErrorStateForPauselessConsumption method with
       // additional checks for pauseless consumption
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionConsumingTransitionFailureTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionConsumingTransitionFailureTest.java
new file mode 100644
index 00000000000..f1204f259db
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionConsumingTransitionFailureTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableConfig;
+import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider;
+import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class PauselessDedupRealtimeIngestionConsumingTransitionFailureTest
+    extends PauselessDedupRealtimeIngestionSegmentCommitFailureTest {
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:" 
+ _controllerConfig.getDataDir());
+    serverConf.setProperty("pinot.server.instance." + 
HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
+        "true");
+    serverConf.setProperty("pinot.server.instance." + 
CommonConstants.Server.TABLE_DATA_MANAGER_PROVIDER_CLASS,
+        
"org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider");
+    serverConf.setProperty("pinot.server.instance." + 
FailureInjectingTableDataManagerProvider.FAILURE_CONFIG_KEY + "."
+        + getPauselessTableName(), new FailureInjectingTableConfig(false, 
true, getExpectedMaxFailures()).toJson());
+  }
+
+  @Override
+  protected int getExpectedMaxFailures() {
+    return 2;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionConsumingTransitionFailureTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionConsumingTransitionFailureTest.java
new file mode 100644
index 00000000000..432ead97abd
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionConsumingTransitionFailureTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableConfig;
+import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider;
+import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class PauselessRealtimeIngestionConsumingTransitionFailureTest
+    extends PauselessRealtimeIngestionSegmentCommitFailureTest {
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:" 
+ _controllerConfig.getDataDir());
+    serverConf.setProperty("pinot.server.instance." + 
HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
+        "true");
+    serverConf.setProperty("pinot.server.instance." + 
CommonConstants.Server.TABLE_DATA_MANAGER_PROVIDER_CLASS,
+        
"org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider");
+    serverConf.setProperty("pinot.server.instance." + 
FailureInjectingTableDataManagerProvider.FAILURE_CONFIG_KEY + "."
+        + getPauselessTableName(), new FailureInjectingTableConfig(false, 
true, getExpectedMaxFailures()).toJson());
+  }
+
+  @Override
+  protected int getExpectedMaxFailures() {
+    return 2;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
index 7ecb5e5c346..1cd5842fbd9 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
@@ -33,6 +33,8 @@ import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter;
 import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingPinotLLCRealtimeSegmentManager;
+import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableConfig;
+import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider;
 import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -49,7 +51,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingRealtimeTableDataManager.MAX_NUMBER_OF_FAILURES;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -75,6 +76,8 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
         "true");
     serverConf.setProperty("pinot.server.instance." + 
CommonConstants.Server.TABLE_DATA_MANAGER_PROVIDER_CLASS,
         
"org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider");
+    serverConf.setProperty("pinot.server.instance." + 
FailureInjectingTableDataManagerProvider.FAILURE_CONFIG_KEY + "."
+        + getPauselessTableName(), new FailureInjectingTableConfig(true, 
false, getExpectedMaxFailures()).toJson());
   }
 
   @Override
@@ -115,7 +118,7 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
     waitForDocsLoaded(600_000L, true, tableConfig2.getTableName());
 
     // create schema for pauseless table
-    schema.setSchemaName(DEFAULT_TABLE_NAME);
+    schema.setSchemaName(getPauselessTableName());
     addSchema(schema);
 
     // add pauseless table
@@ -136,8 +139,12 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
 
     addTableConfig(tableConfig);
     String realtimeTableName = tableConfig.getTableName();
-    TestUtils.waitForCondition(aVoid -> 
getNumErrorSegmentsInEV(realtimeTableName) == MAX_NUMBER_OF_FAILURES, 600_000L,
-        "Segments still not in error state");
+    TestUtils.waitForCondition(aVoid -> 
getNumErrorSegmentsInEV(realtimeTableName) == getExpectedMaxFailures(),
+        600_000L, "Segments still not in error state");
+  }
+
+  protected int getExpectedMaxFailures() {
+    return 10;
   }
 
   protected void setMaxSegmentCompletionTimeMillis() {
@@ -168,10 +175,10 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
   @Test
   public void testSegmentAssignment()
       throws Exception {
-    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    String pauselessTableName = 
TableNameBuilder.REALTIME.tableNameWithType(getPauselessTableName());
 
     // 1) Capture which segments went into the ERROR state
-    List<String> erroredSegments = getErrorSegmentsInEV(realtimeTableName);
+    List<String> erroredSegments = getSegmentsInEV(pauselessTableName, 
SegmentStateModel.ERROR);
     assertFalse(erroredSegments.isEmpty(), "No segments found in ERROR state, 
expected at least one.");
 
     // Let the RealtimeSegmentValidationManager run so it can fix up segments
@@ -179,30 +186,34 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
     _controllerStarter.getRealtimeSegmentValidationManager().run();
 
     // Wait until there are no ERROR segments in the ExternalView
-    TestUtils.waitForCondition(aVoid -> 
getErrorSegmentsInEV(realtimeTableName).isEmpty(), 600_000L,
-        "Some segments are still in ERROR state after resetSegments()");
+    TestUtils.waitForCondition(aVoid -> getSegmentsInEV(pauselessTableName, 
SegmentStateModel.ERROR).isEmpty(),
+        600_000L, "Some segments are still in ERROR state after 
resetSegments()");
+    // Segment in EV must not be offline for pauseless table
+    TestUtils.waitForCondition(aVoid -> getSegmentsInEV(pauselessTableName, 
SegmentStateModel.OFFLINE).isEmpty(),
+        30_000L, "Some segments are in OFFLINE state after resetSegments()");
 
     // Finally compare metadata across your two tables
-    
compareZKMetadataForSegments(_helixResourceManager.getSegmentsZKMetadata(realtimeTableName),
-        
_helixResourceManager.getSegmentsZKMetadata(TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME_2)));
+    
compareZKMetadataForSegments(_helixResourceManager.getSegmentsZKMetadata(pauselessTableName),
+        _helixResourceManager.getSegmentsZKMetadata(
+            
TableNameBuilder.REALTIME.tableNameWithType(getNonPauselessTableName())));
   }
 
   /**
-   * Returns the list of segment names in ERROR state from the ExternalView of 
the given table.
+   * Returns the list of segment names in the given state from the 
ExternalView of the given table.
    */
-  private List<String> getErrorSegmentsInEV(String realtimeTableName) {
+  private List<String> getSegmentsInEV(String realtimeTableName, String 
status) {
     ExternalView externalView = _helixResourceManager.getHelixAdmin()
         .getResourceExternalView(_helixResourceManager.getHelixClusterName(), 
realtimeTableName);
     if (externalView == null) {
       return List.of();
     }
-    List<String> errorSegments = new ArrayList<>();
+    List<String> segmentsToReturn = new ArrayList<>();
     for (Map.Entry<String, Map<String, String>> entry : 
externalView.getRecord().getMapFields().entrySet()) {
-      if (entry.getValue().containsValue(SegmentStateModel.ERROR)) {
-        errorSegments.add(entry.getKey());
+      if (entry.getValue().containsValue(status)) {
+        segmentsToReturn.add(entry.getKey());
       }
     }
-    return errorSegments;
+    return segmentsToReturn;
   }
 
   private void compareZKMetadataForSegments(List<SegmentZKMetadata> 
segmentsZKMetadata,
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
index ee13b49e08b..fdc555a142c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
@@ -50,13 +50,15 @@ public class FailureInjectingRealtimeSegmentDataManager 
extends RealtimeSegmentD
       RealtimeTableDataManager realtimeTableDataManager, String 
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
       Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator 
consumerCoordinator,
       ServerMetrics serverMetrics, boolean failCommit, 
PartitionDedupMetadataManager partitionDedupMetadataManager,
-      BooleanSupplier isTableReadyToConsumeData)
+      BooleanSupplier isTableReadyToConsumeData, boolean 
failConsumingTransition)
       throws AttemptsExceededException, RetriableOperationException {
     // Pass through to the real parent constructor
     super(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
resourceDataDir, indexLoadingConfig, schema,
         llcSegmentName, consumerCoordinator, serverMetrics, null /* no 
PartitionUpsertMetadataManager */,
         partitionDedupMetadataManager, isTableReadyToConsumeData);
-
+    if (failConsumingTransition) {
+      throw new RuntimeException("Forced to fail the consuming transition");
+    }
     _failCommit = failCommit;
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
index 994800ea70d..b85c316e768 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
@@ -22,9 +22,9 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BooleanSupplier;
 import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.PauselessConsumptionUtils;
 import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
@@ -38,16 +38,18 @@ import 
org.apache.pinot.spi.utils.retry.RetriableOperationException;
 
 
 public class FailureInjectingRealtimeTableDataManager extends 
RealtimeTableDataManager {
-  public static final int MAX_NUMBER_OF_FAILURES = 10;
   private final AtomicInteger _numberOfFailures = new AtomicInteger(0);
+  private final FailureInjectingTableConfig _failureInjectingTableConfig;
 
   public FailureInjectingRealtimeTableDataManager(Semaphore 
segmentBuildSemaphore) {
-    this(segmentBuildSemaphore, () -> true);
+    this(segmentBuildSemaphore, () -> true, null);
   }
 
   public FailureInjectingRealtimeTableDataManager(Semaphore 
segmentBuildSemaphore,
-      Supplier<Boolean> isServerReadyToServeQueries) {
+      Supplier<Boolean> isServerReadyToServeQueries,
+      @Nullable FailureInjectingTableConfig failureInjectingTableConfig) {
     super(segmentBuildSemaphore, isServerReadyToServeQueries);
+    _failureInjectingTableConfig = failureInjectingTableConfig;
   }
 
   @Override
@@ -57,12 +59,19 @@ public class FailureInjectingRealtimeTableDataManager 
extends RealtimeTableDataM
       PartitionDedupMetadataManager partitionDedupMetadataManager, 
BooleanSupplier isTableReadyToConsumeData)
       throws AttemptsExceededException, RetriableOperationException {
 
-    boolean addFailureToCommits = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
-    if (addFailureToCommits && _numberOfFailures.getAndIncrement() >= 
MAX_NUMBER_OF_FAILURES) {
+    boolean addFailureToCommits =
+        (_failureInjectingTableConfig != null) && 
(_failureInjectingTableConfig.isFailCommit());
+    if (addFailureToCommits && (_numberOfFailures.getAndIncrement() >= 
_failureInjectingTableConfig.getMaxFailures())) {
       addFailureToCommits = false;
     }
+    boolean failConsumingTransition =
+        (_failureInjectingTableConfig != null) && 
(_failureInjectingTableConfig.isFailConsumingSegment());
+    if (failConsumingTransition && (_numberOfFailures.getAndIncrement()
+        >= _failureInjectingTableConfig.getMaxFailures())) {
+      failConsumingTransition = false;
+    }
     return new FailureInjectingRealtimeSegmentDataManager(zkMetadata, 
tableConfig, this, _indexDir.getAbsolutePath(),
         indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, 
_serverMetrics, addFailureToCommits,
-        partitionDedupMetadataManager, isTableReadyToConsumeData);
+        partitionDedupMetadataManager, isTableReadyToConsumeData, 
failConsumingTransition);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableConfig.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableConfig.java
new file mode 100644
index 00000000000..6c856014627
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableConfig.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.realtime.utils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+
+public class FailureInjectingTableConfig {
+  private final boolean _failCommit;
+  private final boolean _failConsumingSegment;
+  private final int _maxFailures;
+
+  @JsonCreator
+  public FailureInjectingTableConfig(@JsonProperty("failCommit") boolean 
failCommit,
+      @JsonProperty("failConsumingSegment") boolean failConsumingSegment,
+      @JsonProperty("maxFailures") int maxFailures) {
+    _failCommit = failCommit;
+    _failConsumingSegment = failConsumingSegment;
+    _maxFailures = maxFailures;
+  }
+
+  public boolean isFailConsumingSegment() {
+    return _failConsumingSegment;
+  }
+
+  public boolean isFailCommit() {
+    return _failCommit;
+  }
+
+  public int getMaxFailures() {
+    return _maxFailures;
+  }
+
+  public String toJson() {
+    try {
+      return new ObjectMapper().writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "FailureInjectingTableConfig{"
+        + "_failCommit=" + _failCommit
+        + ", _failConsumingSegment=" + _failConsumingSegment
+        + ", _maxFailures=" + _maxFailures
+        + '}';
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index 67cca949e91..dfa5cc32b43 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.integration.tests.realtime.utils;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.cache.Cache;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -38,19 +40,25 @@ import 
org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 /**
  * Default implementation of {@link TableDataManagerProvider}.
  */
 public class FailureInjectingTableDataManagerProvider implements 
TableDataManagerProvider {
+  public static final String FAILURE_CONFIG_KEY = "failure.config";
+
   private InstanceDataManagerConfig _instanceDataManagerConfig;
   private HelixManager _helixManager;
   private SegmentLocks _segmentLocks;
   private Semaphore _segmentBuildSemaphore;
+  private Map<String, FailureInjectingTableConfig> 
_tableNameToFailureInjectingConfig;
+  private PinotConfiguration _pinotConfiguration;
 
   @Override
   public void init(InstanceDataManagerConfig instanceDataManagerConfig, 
HelixManager helixManager,
@@ -60,6 +68,7 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
     _segmentLocks = segmentLocks;
     int maxParallelSegmentBuilds = 
instanceDataManagerConfig.getMaxParallelSegmentBuilds();
     _segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new 
Semaphore(maxParallelSegmentBuilds, true) : null;
+    _pinotConfiguration = instanceDataManagerConfig.getConfig();
   }
 
   @Override
@@ -85,8 +94,21 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
                   + "configured the segmentstore uri. Configure the server 
config %s",
               StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, 
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
         }
+        String failureConfigKey =
+            FAILURE_CONFIG_KEY + "." + 
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
+        FailureInjectingTableConfig failureInjectingTableConfig = null;
+        if (_pinotConfiguration.containsKey(failureConfigKey)) {
+          try {
+            failureInjectingTableConfig =
+                new 
ObjectMapper().readValue(_pinotConfiguration.getProperty(failureConfigKey),
+                    FailureInjectingTableConfig.class);
+          } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+          }
+        }
         tableDataManager =
-            new 
FailureInjectingRealtimeTableDataManager(_segmentBuildSemaphore, 
isServerReadyToServeQueries);
+            new 
FailureInjectingRealtimeTableDataManager(_segmentBuildSemaphore, 
isServerReadyToServeQueries,
+                failureInjectingTableConfig);
         break;
       default:
         throw new IllegalStateException();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to