This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9e8caf52571 Resets in-progress pauseless table segments (#16904)
9e8caf52571 is described below
commit 9e8caf525710029d29848b38861124443edcd6c4
Author: NOOB <[email protected]>
AuthorDate: Tue Oct 7 07:11:15 2025 +0530
Resets in-progress pauseless table segments (#16904)
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 71 +++++++++++-----------
.../helix/core/rebalance/TableRebalancer.java | 2 +-
.../RealtimeSegmentValidationManager.java | 2 +-
...imeIngestionConsumingTransitionFailureTest.java | 45 ++++++++++++++
...imeIngestionConsumingTransitionFailureTest.java | 45 ++++++++++++++
...sRealtimeIngestionSegmentCommitFailureTest.java | 43 ++++++++-----
...FailureInjectingRealtimeSegmentDataManager.java | 6 +-
.../FailureInjectingRealtimeTableDataManager.java | 23 ++++---
.../utils/FailureInjectingTableConfig.java | 69 +++++++++++++++++++++
.../FailureInjectingTableDataManagerProvider.java | 24 +++++++-
10 files changed, 265 insertions(+), 65 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8a3dccfaa3b..94f1404ea1b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2608,18 +2608,18 @@ public class PinotLLCRealtimeSegmentManager {
/**
* Re-ingests segments that are in ERROR state in EV but ONLINE in IS with
no peer copy on any server. This method
- * will call the server reingestSegment API
- * on one of the alive servers that are supposed to host that segment
according to IdealState.
- *
- * API signature:
- * POST http://[serverURL]/reingestSegment/[segmentName]
- * Request body (JSON):
- *
+ * will call the server reingestSegment API on one of the alive servers that
are supposed to host that segment
+ * according to IdealState.
+ * <p>
+ * API signature: POST http://[serverURL]/reingestSegment/[segmentName]
Request body (JSON):
+ * <p>
* If segment is in ERROR state in only few replicas but has download URL,
we instead trigger a segment reset
- * @param tableConfig The table config
+ *
+ * @param tableConfig The table config
+ * @param segmentAutoResetOnErrorAtValidation flag to determine whether to
reset the error segments or not
*/
public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig
tableConfig,
- boolean repairErrorSegmentsForPartialUpsertOrDedup) {
+ boolean repairErrorSegmentsForPartialUpsertOrDedup, boolean
segmentAutoResetOnErrorAtValidation) {
String realtimeTableName = tableConfig.getTableName();
// Fetch ideal state and external view
IdealState idealState = getIdealState(realtimeTableName);
@@ -2663,18 +2663,6 @@ public class PinotLLCRealtimeSegmentManager {
continue;
}
- // Skip segments that are not ONLINE in ideal state
- boolean hasOnlineInstance = false;
- for (String state : idealStateMap.values()) {
- if (SegmentStateModel.ONLINE.equals(state)) {
- hasOnlineInstance = true;
- break;
- }
- }
- if (!hasOnlineInstance) {
- continue;
- }
-
if (numReplicasInError > 0) {
segmentsInErrorStateInAtLeastOneReplica.add(segmentName);
}
@@ -2696,17 +2684,12 @@ public class PinotLLCRealtimeSegmentManager {
segmentsInErrorStateInAtLeastOneReplica.size(),
segmentsInErrorStateInAtLeastOneReplica,
segmentsInErrorStateInAllReplicas.size(),
segmentsInErrorStateInAllReplicas, realtimeTableName);
- if
(!allowRepairOfErrorSegments(repairErrorSegmentsForPartialUpsertOrDedup,
tableConfig)) {
- // We do not run reingestion for dedup and partial upsert tables in
pauseless as it can
- // lead to data inconsistencies
- _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
- ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT,
segmentsInErrorStateInAllReplicas.size());
- return;
- } else {
- LOGGER.info("Repairing error segments in table: {}.", realtimeTableName);
- _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
- segmentsInErrorStateInAllReplicas.size());
- }
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
+ segmentsInErrorStateInAllReplicas.size());
+
+ boolean repairCommittingSegments =
+
allowRepairOfCommittingSegments(repairErrorSegmentsForPartialUpsertOrDedup,
tableConfig);
+ int segmentsInUnRecoverableState = 0;
for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segmentName);
@@ -2718,6 +2701,16 @@ public class PinotLLCRealtimeSegmentManager {
// We only consider segments that are in COMMITTING state for reingestion
if (segmentZKMetadata.getStatus() == Status.COMMITTING &&
segmentsInErrorStateInAllReplicas.contains(
segmentName)) {
+
+ if (!repairCommittingSegments) {
+ segmentsInUnRecoverableState += 1;
+ LOGGER.info(
+ "Segment: {} in table: {} is COMMITTING with all replicas in
ERROR state. Skipping re-ingestion since "
+ + "repairErrorSegments is false.",
+ segmentName, realtimeTableName);
+ continue;
+ }
+
LOGGER.info("Segment: {} in table: {} is COMMITTING with all replicas
in ERROR state. Triggering re-ingestion.",
segmentName, realtimeTableName);
@@ -2737,20 +2730,24 @@ public class PinotLLCRealtimeSegmentManager {
} catch (Exception e) {
LOGGER.error("Failed to call reingestSegment for segment: {} on
server: {}", segmentName, aliveServer, e);
}
- } else if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) {
- // Trigger reset for segment not in IN_PROGRESS state to download the
segment from deep store or peer server
+ } else if (segmentAutoResetOnErrorAtValidation) {
_helixResourceManager.resetSegment(realtimeTableName, segmentName,
null);
}
}
+
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+ ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT,
segmentsInUnRecoverableState);
}
/**
- * Whether to allow repairing the ERROR segment or not
+ * Whether to allow repairing the ERROR segments with ZK status: COMMITTING
+ * This method is only useful for pauseless ingestion with features like
dedup enabled (Since repairing committing
+ * segments for such tables can lead to incorrect data).
* @param repairErrorSegmentsForPartialUpsertOrDedup API context flag, if
true then always allow repair
* @param tableConfig tableConfig
- * @return Returns true if repair is allowed for ERROR segments or not
+ * @return Returns true if repair is allowed
*/
- public boolean allowRepairOfErrorSegments(boolean
repairErrorSegmentsForPartialUpsertOrDedup,
+ public boolean allowRepairOfCommittingSegments(boolean
repairErrorSegmentsForPartialUpsertOrDedup,
TableConfig tableConfig) {
if (repairErrorSegmentsForPartialUpsertOrDedup) {
// If API context has repairErrorSegmentsForPartialUpsertOrDedup=true,
allow repair.
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 2edd6042025..be374dbed6c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -1977,7 +1977,7 @@ public class TableRebalancer {
// best to return that there is a risk of data loss for pauseless
enabled tables for segments in COMMITTING
// state
if (_isPauselessEnabled && segmentZKMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.COMMITTING
- &&
!_pinotLLCRealtimeSegmentManager.allowRepairOfErrorSegments(false,
_tableConfig)) {
+ &&
!_pinotLLCRealtimeSegmentManager.allowRepairOfCommittingSegments(false,
_tableConfig)) {
return Pair.of(true, generateDataLossRiskMessage(segmentName, false));
}
return NO_DATA_LOSS_RISK_RESULT;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index cbe6a45621e..6568c82d477 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -133,7 +133,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
if (isPauselessConsumptionEnabled) {
// For pauseless tables without dedup or partial upsert, repair segments
in error state
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig,
- context._repairErrorSegmentsForPartialUpsertOrDedup);
+ context._repairErrorSegmentsForPartialUpsertOrDedup,
_segmentAutoResetOnErrorAtValidation);
} else if (_segmentAutoResetOnErrorAtValidation) {
// Reset for pauseless tables is already handled in
repairSegmentsInErrorStateForPauselessConsumption method with
// additional checks for pauseless consumption
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionConsumingTransitionFailureTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionConsumingTransitionFailureTest.java
new file mode 100644
index 00000000000..f1204f259db
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionConsumingTransitionFailureTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableConfig;
+import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider;
+import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class PauselessDedupRealtimeIngestionConsumingTransitionFailureTest
+ extends PauselessDedupRealtimeIngestionSegmentCommitFailureTest {
+ @Override
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:"
+ _controllerConfig.getDataDir());
+ serverConf.setProperty("pinot.server.instance." +
HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
+ "true");
+ serverConf.setProperty("pinot.server.instance." +
CommonConstants.Server.TABLE_DATA_MANAGER_PROVIDER_CLASS,
+
"org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider");
+ serverConf.setProperty("pinot.server.instance." +
FailureInjectingTableDataManagerProvider.FAILURE_CONFIG_KEY + "."
+ + getPauselessTableName(), new FailureInjectingTableConfig(false,
true, getExpectedMaxFailures()).toJson());
+ }
+
+ @Override
+ protected int getExpectedMaxFailures() {
+ return 2;
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionConsumingTransitionFailureTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionConsumingTransitionFailureTest.java
new file mode 100644
index 00000000000..432ead97abd
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionConsumingTransitionFailureTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableConfig;
+import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider;
+import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class PauselessRealtimeIngestionConsumingTransitionFailureTest
+ extends PauselessRealtimeIngestionSegmentCommitFailureTest {
+ @Override
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:"
+ _controllerConfig.getDataDir());
+ serverConf.setProperty("pinot.server.instance." +
HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
+ "true");
+ serverConf.setProperty("pinot.server.instance." +
CommonConstants.Server.TABLE_DATA_MANAGER_PROVIDER_CLASS,
+
"org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider");
+ serverConf.setProperty("pinot.server.instance." +
FailureInjectingTableDataManagerProvider.FAILURE_CONFIG_KEY + "."
+ + getPauselessTableName(), new FailureInjectingTableConfig(false,
true, getExpectedMaxFailures()).toJson());
+ }
+
+ @Override
+ protected int getExpectedMaxFailures() {
+ return 2;
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
index 7ecb5e5c346..1cd5842fbd9 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
@@ -33,6 +33,8 @@ import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter;
import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingPinotLLCRealtimeSegmentManager;
+import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableConfig;
+import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -49,7 +51,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingRealtimeTableDataManager.MAX_NUMBER_OF_FAILURES;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -75,6 +76,8 @@ public class
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
"true");
serverConf.setProperty("pinot.server.instance." +
CommonConstants.Server.TABLE_DATA_MANAGER_PROVIDER_CLASS,
"org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider");
+ serverConf.setProperty("pinot.server.instance." +
FailureInjectingTableDataManagerProvider.FAILURE_CONFIG_KEY + "."
+ + getPauselessTableName(), new FailureInjectingTableConfig(true,
false, getExpectedMaxFailures()).toJson());
}
@Override
@@ -115,7 +118,7 @@ public class
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
waitForDocsLoaded(600_000L, true, tableConfig2.getTableName());
// create schema for pauseless table
- schema.setSchemaName(DEFAULT_TABLE_NAME);
+ schema.setSchemaName(getPauselessTableName());
addSchema(schema);
// add pauseless table
@@ -136,8 +139,12 @@ public class
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
addTableConfig(tableConfig);
String realtimeTableName = tableConfig.getTableName();
- TestUtils.waitForCondition(aVoid ->
getNumErrorSegmentsInEV(realtimeTableName) == MAX_NUMBER_OF_FAILURES, 600_000L,
- "Segments still not in error state");
+ TestUtils.waitForCondition(aVoid ->
getNumErrorSegmentsInEV(realtimeTableName) == getExpectedMaxFailures(),
+ 600_000L, "Segments still not in error state");
+ }
+
+ protected int getExpectedMaxFailures() {
+ return 10;
}
protected void setMaxSegmentCompletionTimeMillis() {
@@ -168,10 +175,10 @@ public class
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
@Test
public void testSegmentAssignment()
throws Exception {
- String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+ String pauselessTableName =
TableNameBuilder.REALTIME.tableNameWithType(getPauselessTableName());
// 1) Capture which segments went into the ERROR state
- List<String> erroredSegments = getErrorSegmentsInEV(realtimeTableName);
+ List<String> erroredSegments = getSegmentsInEV(pauselessTableName,
SegmentStateModel.ERROR);
assertFalse(erroredSegments.isEmpty(), "No segments found in ERROR state,
expected at least one.");
// Let the RealtimeSegmentValidationManager run so it can fix up segments
@@ -179,30 +186,34 @@ public class
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
_controllerStarter.getRealtimeSegmentValidationManager().run();
// Wait until there are no ERROR segments in the ExternalView
- TestUtils.waitForCondition(aVoid ->
getErrorSegmentsInEV(realtimeTableName).isEmpty(), 600_000L,
- "Some segments are still in ERROR state after resetSegments()");
+ TestUtils.waitForCondition(aVoid -> getSegmentsInEV(pauselessTableName,
SegmentStateModel.ERROR).isEmpty(),
+ 600_000L, "Some segments are still in ERROR state after
resetSegments()");
+ // Segment in EV must not be offline for pauseless table
+ TestUtils.waitForCondition(aVoid -> getSegmentsInEV(pauselessTableName,
SegmentStateModel.OFFLINE).isEmpty(),
+ 30_000L, "Some segments are in OFFLINE state after resetSegments()");
// Finally compare metadata across your two tables
-
compareZKMetadataForSegments(_helixResourceManager.getSegmentsZKMetadata(realtimeTableName),
-
_helixResourceManager.getSegmentsZKMetadata(TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME_2)));
+
compareZKMetadataForSegments(_helixResourceManager.getSegmentsZKMetadata(pauselessTableName),
+ _helixResourceManager.getSegmentsZKMetadata(
+
TableNameBuilder.REALTIME.tableNameWithType(getNonPauselessTableName())));
}
/**
- * Returns the list of segment names in ERROR state from the ExternalView of
the given table.
+ * Returns the list of segment names in the given state from the
ExternalView of the given table.
*/
- private List<String> getErrorSegmentsInEV(String realtimeTableName) {
+ private List<String> getSegmentsInEV(String realtimeTableName, String
status) {
ExternalView externalView = _helixResourceManager.getHelixAdmin()
.getResourceExternalView(_helixResourceManager.getHelixClusterName(),
realtimeTableName);
if (externalView == null) {
return List.of();
}
- List<String> errorSegments = new ArrayList<>();
+ List<String> segmentsToReturn = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry :
externalView.getRecord().getMapFields().entrySet()) {
- if (entry.getValue().containsValue(SegmentStateModel.ERROR)) {
- errorSegments.add(entry.getKey());
+ if (entry.getValue().containsValue(status)) {
+ segmentsToReturn.add(entry.getKey());
}
}
- return errorSegments;
+ return segmentsToReturn;
}
private void compareZKMetadataForSegments(List<SegmentZKMetadata>
segmentsZKMetadata,
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
index ee13b49e08b..fdc555a142c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
@@ -50,13 +50,15 @@ public class FailureInjectingRealtimeSegmentDataManager
extends RealtimeSegmentD
RealtimeTableDataManager realtimeTableDataManager, String
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator
consumerCoordinator,
ServerMetrics serverMetrics, boolean failCommit,
PartitionDedupMetadataManager partitionDedupMetadataManager,
- BooleanSupplier isTableReadyToConsumeData)
+ BooleanSupplier isTableReadyToConsumeData, boolean
failConsumingTransition)
throws AttemptsExceededException, RetriableOperationException {
// Pass through to the real parent constructor
super(segmentZKMetadata, tableConfig, realtimeTableDataManager,
resourceDataDir, indexLoadingConfig, schema,
llcSegmentName, consumerCoordinator, serverMetrics, null /* no
PartitionUpsertMetadataManager */,
partitionDedupMetadataManager, isTableReadyToConsumeData);
-
+ if (failConsumingTransition) {
+ throw new RuntimeException("Forced to fail the consuming transition");
+ }
_failCommit = failCommit;
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
index 994800ea70d..b85c316e768 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
@@ -22,9 +22,9 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
@@ -38,16 +38,18 @@ import
org.apache.pinot.spi.utils.retry.RetriableOperationException;
public class FailureInjectingRealtimeTableDataManager extends
RealtimeTableDataManager {
- public static final int MAX_NUMBER_OF_FAILURES = 10;
private final AtomicInteger _numberOfFailures = new AtomicInteger(0);
+ private final FailureInjectingTableConfig _failureInjectingTableConfig;
public FailureInjectingRealtimeTableDataManager(Semaphore
segmentBuildSemaphore) {
- this(segmentBuildSemaphore, () -> true);
+ this(segmentBuildSemaphore, () -> true, null);
}
public FailureInjectingRealtimeTableDataManager(Semaphore
segmentBuildSemaphore,
- Supplier<Boolean> isServerReadyToServeQueries) {
+ Supplier<Boolean> isServerReadyToServeQueries,
+ @Nullable FailureInjectingTableConfig failureInjectingTableConfig) {
super(segmentBuildSemaphore, isServerReadyToServeQueries);
+ _failureInjectingTableConfig = failureInjectingTableConfig;
}
@Override
@@ -57,12 +59,19 @@ public class FailureInjectingRealtimeTableDataManager
extends RealtimeTableDataM
PartitionDedupMetadataManager partitionDedupMetadataManager,
BooleanSupplier isTableReadyToConsumeData)
throws AttemptsExceededException, RetriableOperationException {
- boolean addFailureToCommits =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
- if (addFailureToCommits && _numberOfFailures.getAndIncrement() >=
MAX_NUMBER_OF_FAILURES) {
+ boolean addFailureToCommits =
+ (_failureInjectingTableConfig != null) &&
(_failureInjectingTableConfig.isFailCommit());
+ if (addFailureToCommits && (_numberOfFailures.getAndIncrement() >=
_failureInjectingTableConfig.getMaxFailures())) {
addFailureToCommits = false;
}
+ boolean failConsumingTransition =
+ (_failureInjectingTableConfig != null) &&
(_failureInjectingTableConfig.isFailConsumingSegment());
+ if (failConsumingTransition && (_numberOfFailures.getAndIncrement()
+ >= _failureInjectingTableConfig.getMaxFailures())) {
+ failConsumingTransition = false;
+ }
return new FailureInjectingRealtimeSegmentDataManager(zkMetadata,
tableConfig, this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, consumerCoordinator,
_serverMetrics, addFailureToCommits,
- partitionDedupMetadataManager, isTableReadyToConsumeData);
+ partitionDedupMetadataManager, isTableReadyToConsumeData,
failConsumingTransition);
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableConfig.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableConfig.java
new file mode 100644
index 00000000000..6c856014627
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableConfig.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.realtime.utils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+
+public class FailureInjectingTableConfig {
+ private final boolean _failCommit;
+ private final boolean _failConsumingSegment;
+ private final int _maxFailures;
+
+ @JsonCreator
+ public FailureInjectingTableConfig(@JsonProperty("failCommit") boolean
failCommit,
+ @JsonProperty("failConsumingSegment") boolean failConsumingSegment,
+ @JsonProperty("maxFailures") int maxFailures) {
+ _failCommit = failCommit;
+ _failConsumingSegment = failConsumingSegment;
+ _maxFailures = maxFailures;
+ }
+
+ public boolean isFailConsumingSegment() {
+ return _failConsumingSegment;
+ }
+
+ public boolean isFailCommit() {
+ return _failCommit;
+ }
+
+ public int getMaxFailures() {
+ return _maxFailures;
+ }
+
+ public String toJson() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FailureInjectingTableConfig{"
+ + "_failCommit=" + _failCommit
+ + ", _failConsumingSegment=" + _failConsumingSegment
+ + ", _maxFailures=" + _maxFailures
+ + '}';
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index 67cca949e91..dfa5cc32b43 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.integration.tests.realtime.utils;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.Cache;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -38,19 +40,25 @@ import
org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
/**
* Default implementation of {@link TableDataManagerProvider}.
*/
public class FailureInjectingTableDataManagerProvider implements
TableDataManagerProvider {
+ public static final String FAILURE_CONFIG_KEY = "failure.config";
+
private InstanceDataManagerConfig _instanceDataManagerConfig;
private HelixManager _helixManager;
private SegmentLocks _segmentLocks;
private Semaphore _segmentBuildSemaphore;
+ private Map<String, FailureInjectingTableConfig>
_tableNameToFailureInjectingConfig;
+ private PinotConfiguration _pinotConfiguration;
@Override
public void init(InstanceDataManagerConfig instanceDataManagerConfig,
HelixManager helixManager,
@@ -60,6 +68,7 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
_segmentLocks = segmentLocks;
int maxParallelSegmentBuilds =
instanceDataManagerConfig.getMaxParallelSegmentBuilds();
_segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new
Semaphore(maxParallelSegmentBuilds, true) : null;
+ _pinotConfiguration = instanceDataManagerConfig.getConfig();
}
@Override
@@ -85,8 +94,21 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
+ "configured the segmentstore uri. Configure the server
config %s",
StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
}
+ String failureConfigKey =
+ FAILURE_CONFIG_KEY + "." +
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
+ FailureInjectingTableConfig failureInjectingTableConfig = null;
+ if (_pinotConfiguration.containsKey(failureConfigKey)) {
+ try {
+ failureInjectingTableConfig =
+ new
ObjectMapper().readValue(_pinotConfiguration.getProperty(failureConfigKey),
+ FailureInjectingTableConfig.class);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
tableDataManager =
- new
FailureInjectingRealtimeTableDataManager(_segmentBuildSemaphore,
isServerReadyToServeQueries);
+ new
FailureInjectingRealtimeTableDataManager(_segmentBuildSemaphore,
isServerReadyToServeQueries,
+ failureInjectingTableConfig);
break;
default:
throw new IllegalStateException();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]