This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 894665cdd31 controller: reuse error-state repair for non-pauseless;
delete redundant reingest method (#16811)
894665cdd31 is described below
commit 894665cdd319336c974c09b072f627521cfe1ff6
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Dec 3 22:54:47 2025 -0800
controller: reuse error-state repair for non-pauseless; delete redundant
reingest method (#16811)
- Reuse repairSegmentsInErrorStateForPauselessConsumption() to repair
errored
segments for both pauseless and non-pauseless tables
- Remove reingestCommittingSegmentsForPauselessDisabled() and route call
sites
to the shared repair flow
- Keep a single reingestion entrypoint: reingestSegment(table, segment,
instances)
with simple predicates (hasOnlineInstance, maybeResetIfNotInProgress)
- Update RealtimeSegmentValidationManager to always invoke the shared
repair flow
(and preserve optional auto-reset behavior)
- Adjust RealtimeSegmentValidationManagerTest expectations accordingly
Behavioral notes:
- Only re-ingests COMMITTING LLC segments with start/end offsets and no
download
URL when all replicas are in ERROR and the segment is ONLINE in IdealState
- Otherwise resets segments not in IN_PROGRESS to fetch from deep store/peer
- Applies only to tables without dedup or partial upsert unless explicitly
allowed
via allowRepairOfErrorSegments()
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 76 +++++++++++++++------
.../RealtimeSegmentValidationManager.java | 23 ++++---
.../RealtimeSegmentValidationManagerTest.java | 79 +++++++++++++++++++---
3 files changed, 137 insertions(+), 41 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9efcc895d64..22743696c48 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2421,8 +2421,7 @@ public class PinotLLCRealtimeSegmentManager {
if (!invalidSegments.isEmpty()) {
LOGGER.warn("Cannot commit segments that are not in CONSUMING state.
All consuming segments: {}, "
+ "provided segments to commit: {}. Ignoring all non-consuming
segments, sampling 10: {}",
- allConsumingSegments,
- segmentsToCommitStr,
invalidSegments.stream().limit(10).collect(Collectors.toSet()));
+ allConsumingSegments, segmentsToCommitStr,
invalidSegments.stream().limit(10).collect(Collectors.toSet()));
}
return validSegmentsToCommit;
}
@@ -2640,6 +2639,7 @@ public class PinotLLCRealtimeSegmentManager {
}
/**
+ * Repair segments in error state:
* Re-ingests segments that are in ERROR state in EV but ONLINE in IS with
no peer copy on any server. This method
* will call the server reingestSegment API on one of the alive servers that
are supposed to host that segment
* according to IdealState.
@@ -2648,18 +2648,17 @@ public class PinotLLCRealtimeSegmentManager {
* <p>
* If segment is in ERROR state in only few replicas but has download URL,
we instead trigger a segment reset
*
- * @param tableConfig The table config
- * @param segmentAutoResetOnErrorAtValidation flag to determine whether to
reset the error segments or not
+ * @param tableConfig The table config
*/
- public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig
tableConfig,
- boolean repairErrorSegmentsForPartialUpsertOrDedup, boolean
segmentAutoResetOnErrorAtValidation) {
+ public void repairSegmentsInErrorState(TableConfig tableConfig,
+ boolean repairErrorSegmentsForPartialUpsertOrDedup) {
+ boolean isPauselessTable =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
String realtimeTableName = tableConfig.getTableName();
// Fetch ideal state and external view
IdealState idealState = getIdealState(realtimeTableName);
ExternalView externalView =
_helixResourceManager.getTableExternalView(realtimeTableName);
if (externalView == null) {
- LOGGER.warn(
- "External view not found for table: {}, skipping repairing segments
in error state for pauseless consumption",
+ LOGGER.warn("External view not found for table: {}, skipping repairing
segments in error state",
realtimeTableName);
return;
}
@@ -2706,9 +2705,14 @@ public class PinotLLCRealtimeSegmentManager {
}
if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) {
- _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
- _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
- ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0);
+ if (isPauselessTable) {
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
+ 0);
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+ ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT,
0);
+ } else {
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0);
+ }
return;
}
@@ -2717,12 +2721,26 @@ public class PinotLLCRealtimeSegmentManager {
segmentsInErrorStateInAtLeastOneReplica.size(),
segmentsInErrorStateInAtLeastOneReplica,
segmentsInErrorStateInAllReplicas.size(),
segmentsInErrorStateInAllReplicas, realtimeTableName);
- _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
- segmentsInErrorStateInAllReplicas.size());
-
boolean repairCommittingSegments =
allowRepairOfCommittingSegments(repairErrorSegmentsForPartialUpsertOrDedup,
tableConfig);
int segmentsInUnRecoverableState = 0;
+ if (isPauselessTable && !repairCommittingSegments) {
+ // We do not run reingestion for dedup and partial upsert tables in
pauseless as it can
+ // lead to data inconsistencies
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+ ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT,
segmentsInErrorStateInAllReplicas.size());
+ return;
+ } else {
+ LOGGER.info("Repairing error segments in table: {}.", realtimeTableName);
+ }
+
+ if (isPauselessTable) {
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
+ segmentsInErrorStateInAllReplicas.size());
+ } else {
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE,
+ segmentsInErrorStateInAllReplicas.size());
+ }
for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segmentName);
@@ -2731,10 +2749,11 @@ public class PinotLLCRealtimeSegmentManager {
realtimeTableName);
continue;
}
- // We only consider segments that are in COMMITTING state for reingestion
- if (segmentZKMetadata.getStatus() == Status.COMMITTING &&
segmentsInErrorStateInAllReplicas.contains(
- segmentName)) {
+ Status status = segmentZKMetadata.getStatus();
+
+ // 1) For COMMITTING segments where all replicas are in ERROR, try
re-ingestion (subject to policy)
+ if (status == Status.COMMITTING &&
segmentsInErrorStateInAllReplicas.contains(segmentName)) {
if (!repairCommittingSegments) {
segmentsInUnRecoverableState += 1;
LOGGER.info(
@@ -2756,20 +2775,33 @@ public class PinotLLCRealtimeSegmentManager {
realtimeTableName);
continue;
}
-
try {
triggerReingestion(aliveServer, segmentName);
LOGGER.info("Successfully triggered re-ingestion for segment: {} on
server: {}", segmentName, aliveServer);
} catch (Exception e) {
- LOGGER.error("Failed to call reingestSegment for segment: {} on
server: {}", segmentName, aliveServer, e);
+ LOGGER.error("Failed to trigger re-ingestion for segment: {} on
server: {}", segmentName, aliveServer, e);
}
- } else if (segmentAutoResetOnErrorAtValidation) {
+ continue;
+ }
+
+ // 2) For IN_PROGRESS segments where all replicas are in ERROR, issue a
reset to allow ERROR->OFFLINE and retry
+ if (status == Status.IN_PROGRESS &&
segmentsInErrorStateInAllReplicas.contains(segmentName)) {
+ LOGGER.info("Resetting IN_PROGRESS segment: {} in table: {} since all
replicas are in ERROR state.",
+ segmentName, realtimeTableName);
+ _helixResourceManager.resetSegment(realtimeTableName, segmentName,
null);
+ continue;
+ }
+
+ // 3) For all other non-IN_PROGRESS statuses with any ERROR replica,
trigger a reset to download/refresh
+ if (status != Status.IN_PROGRESS) {
_helixResourceManager.resetSegment(realtimeTableName, segmentName,
null);
}
}
- _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
- ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT,
segmentsInUnRecoverableState);
+ if (isPauselessTable) {
+ _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
+ ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT,
segmentsInUnRecoverableState);
+ }
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 6568c82d477..15bf560ef5a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -129,15 +129,18 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
LOGGER.info("Skipping segment-level validation for table: {}",
tableConfig.getTableName());
}
- boolean isPauselessConsumptionEnabled =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
- if (isPauselessConsumptionEnabled) {
- // For pauseless tables without dedup or partial upsert, repair segments
in error state
-
_llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig,
- context._repairErrorSegmentsForPartialUpsertOrDedup,
_segmentAutoResetOnErrorAtValidation);
- } else if (_segmentAutoResetOnErrorAtValidation) {
- // Reset for pauseless tables is already handled in
repairSegmentsInErrorStateForPauselessConsumption method with
- // additional checks for pauseless consumption
- _pinotHelixResourceManager.resetSegments(tableConfig.getTableName(),
null, true);
+ boolean isPauselessTable =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+ if (isPauselessTable || _segmentAutoResetOnErrorAtValidation) {
+ // For realtime tables without dedup or partial upsert, repair segments
in error state.
+ // When pauseless consumption is disabled, this behavior remains gated by
+ // _segmentAutoResetOnErrorAtValidation to preserve legacy semantics.
+ _llcRealtimeSegmentManager.repairSegmentsInErrorState(tableConfig,
+ context._repairErrorSegmentsForPartialUpsertOrDedup);
+ } else {
+ LOGGER.debug(
+ "Skipping segment error repair for table {} because pauseless
consumption is disabled and "
+ + "_segmentAutoResetOnErrorAtValidation=false",
+ tableConfig.getTableName());
}
}
@@ -183,7 +186,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
// The table was previously paused due to exceeding resource
utilization, but the current status cannot be
// determined. To be safe, leave it as paused and once the status is
available take the correct action
LOGGER.warn("Resource utilization limit could not be determined for for
table: {}, and it is paused, leave it as "
- + "paused", tableNameWithType);
+ + "paused", tableNameWithType);
return false;
}
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType,
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
index 4c74c0bc1f3..fb511b1feed 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
@@ -23,8 +23,11 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -64,6 +67,52 @@ public class RealtimeSegmentValidationManagerTest {
_llcRealtimeSegmentManager, null, _controllerMetrics,
_storageQuotaChecker, _resourceUtilizationManager);
}
+ @Test
+ public void testReingestionCalledWhenPauselessDisabled() {
+ String rawTable = "testTable";
+ String tableName = rawTable + "_REALTIME";
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(rawTable)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+
+ // Force shouldEnsureConsuming=false by simulating admin pause
+
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+ when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName))
+ .thenReturn(new PauseStatusDetails(true, null,
PauseState.ReasonCode.ADMINISTRATIVE, null, null));
+
+ _realtimeSegmentValidationManager.processTable(tableName, new
RealtimeSegmentValidationManager.Context());
+
+ verify(_llcRealtimeSegmentManager, times(1))
+ .repairSegmentsInErrorState(eq(tableConfig), anyBoolean());
+ }
+
+ @Test
+ public void testNoReingestionWhenPauselessEnabled() {
+ String rawTable = "testTable";
+ String tableName = rawTable + "_REALTIME";
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(rawTable)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+
+ // Enable pauseless on ingestion config
+ org.apache.pinot.spi.config.table.ingestion.IngestionConfig
ingestionConfig =
+ new org.apache.pinot.spi.config.table.ingestion.IngestionConfig();
+ org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig
streamIngestionConfig =
+ new org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig(
+
java.util.List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()));
+ streamIngestionConfig.setPauselessConsumptionEnabled(true);
+ ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+ tableConfig.setIngestionConfig(ingestionConfig);
+
+ // Force shouldEnsureConsuming=false by simulating admin pause (to avoid
ensureAllPartitionsConsuming)
+
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+ when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName))
+ .thenReturn(new PauseStatusDetails(true, null,
PauseState.ReasonCode.ADMINISTRATIVE, null, null));
+
+ _realtimeSegmentValidationManager.processTable(tableName, new
RealtimeSegmentValidationManager.Context());
+
+ verify(_llcRealtimeSegmentManager, times(1))
+ .repairSegmentsInErrorState(eq(tableConfig), anyBoolean());
+ }
+
@AfterMethod
public void tearDown()
throws Exception {
@@ -77,21 +126,32 @@ public class RealtimeSegmentValidationManagerTest {
{true, PauseState.ReasonCode.ADMINISTRATIVE,
UtilizationChecker.CheckResult.PASS, false, false},
// Resource utilization exceeded and pause state is updated, should
return false
- {false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
UtilizationChecker.CheckResult.FAIL, false,
- false},
+ {
+ false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
UtilizationChecker.CheckResult.FAIL,
+ false,
+ false
+ },
// Resource utilization is within limits but was previously paused due
to resource utilization,
// should return true
- {true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
UtilizationChecker.CheckResult.PASS, false,
- true},
+ {
+ true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
UtilizationChecker.CheckResult.PASS, false,
+ true
+ },
// Resource utilization is STALE but was previously paused due to
resource utilization, should return false
- {true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
UtilizationChecker.CheckResult.UNDETERMINED,
- false, false},
+ {
+ true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+ UtilizationChecker.CheckResult.UNDETERMINED,
+ false, false
+ },
// Resource utilization is STALE but was not previously paused due to
resource utilization, should return true
- {false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
UtilizationChecker.CheckResult.UNDETERMINED,
- false, true},
+ {
+ false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+ UtilizationChecker.CheckResult.UNDETERMINED,
+ false, true
+ },
// Resource utilization is within limits but was previously paused due
to storage quota exceeded,
// should return false
@@ -101,7 +161,8 @@ public class RealtimeSegmentValidationManagerTest {
{false, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
UtilizationChecker.CheckResult.PASS, true, false},
// Storage quota within limits but was previously paused due to
storage quota exceeded, should return true
- {true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
UtilizationChecker.CheckResult.PASS, false, true}};
+ {true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
UtilizationChecker.CheckResult.PASS, false, true}
+ };
}
@Test(dataProvider = "testCases")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]