This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 91e929adf3 Allow RealtimeSegmentValidationManager to fix error segments for partial upsert and dedup tables. (#15987) 91e929adf3 is described below commit 91e929adf382dc3705aa4a89718f99cecdde5874 Author: 9aman <35227405+9a...@users.noreply.github.com> AuthorDate: Sat Jun 7 04:01:57 2025 +0530 Allow RealtimeSegmentValidationManager to fix error segments for partial upsert and dedup tables. (#15987) --- .../realtime/PinotLLCRealtimeSegmentManager.java | 10 ++++++++-- .../validation/RealtimeSegmentValidationManager.java | 20 +++++++++++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index a7b04e2766..012ef8d2d5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -2434,7 +2434,8 @@ public class PinotLLCRealtimeSegmentManager { * If segment is in ERROR state in only few replicas but has download URL, we instead trigger a segment reset * @param tableConfig The table config */ - public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableConfig) { + public void repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableConfig, + boolean repairErrorSegmentsForPartialUpsertOrDedup) { String realtimeTableName = tableConfig.getTableName(); // Fetch ideal state and external view IdealState idealState = getIdealState(realtimeTableName); @@ -2516,7 +2517,7 @@ public class PinotLLCRealtimeSegmentManager { boolean isPartialUpsertEnabled = tableConfig.getUpsertConfig() != null && tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL; boolean isDedupEnabled = tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled(); - if ((isPartialUpsertEnabled || isDedupEnabled)) { + if ((isPartialUpsertEnabled || isDedupEnabled) && !repairErrorSegmentsForPartialUpsertOrDedup) { // We do not run reingestion for dedup and partial upsert tables in pauseless as it can // lead to data inconsistencies _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, @@ -2525,6 +2526,11 @@ public class PinotLLCRealtimeSegmentManager { realtimeTableName); return; } else { + if ((isPartialUpsertEnabled || isDedupEnabled)) { + LOGGER.info( + "Repairing error segments in table: {} as repairErrorSegmentForPartialUpsertOrDedup is set to true", + realtimeTableName); + } _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size()); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 9fa236ecde..21d3d42802 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -67,6 +67,8 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea public static final String OFFSET_CRITERIA = "offsetCriteria"; public static final String RUN_SEGMENT_LEVEL_VALIDATION = "runSegmentLevelValidation"; + public static final String REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP = + "repairErrorSegmentsForPartialUpsertOrDedup"; public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, @@ -96,6 +98,8 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea context._runSegmentLevelValidation = true; _lastSegmentLevelValidationRunTimeMs = currentTimeMs; } + context._repairErrorSegmentsForPartialUpsertOrDedup = + shouldRepairErrorSegmentsForPartialUpsertOrDedup(periodicTaskProperties); String offsetCriteriaStr = periodicTaskProperties.getProperty(OFFSET_CRITERIA); if (offsetCriteriaStr != null) { context._offsetCriteria = new OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr); @@ -129,7 +133,8 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea boolean isPauselessConsumptionEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig); if (isPauselessConsumptionEnabled) { // For pauseless tables without dedup or partial upsert, repair segments in error state - _llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig); + _llcRealtimeSegmentManager.repairSegmentsInErrorStateForPauselessConsumption(tableConfig, + context._repairErrorSegmentsForPartialUpsertOrDedup); } else if (_segmentAutoResetOnErrorAtValidation) { // Reset for pauseless tables is already handled in repairSegmentsInErrorStateForPauselessConsumption method with // additional checks for pauseless consumption @@ -261,6 +266,18 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea return runValidation || timeThresholdMet; } + private boolean shouldRepairErrorSegmentsForPartialUpsertOrDedup(Properties periodicTaskProperties) { + return Optional.ofNullable(periodicTaskProperties.getProperty(REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP)) + .map(value -> { + try { + return Boolean.parseBoolean(value); + } catch (Exception e) { + return false; + } + }) + .orElse(false); + } + @Override protected void nonLeaderCleanup(List<String> tableNamesWithType) { for (String tableNameWithType : tableNamesWithType) { @@ -288,6 +305,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea public static final class Context { private boolean _runSegmentLevelValidation; + private boolean _repairErrorSegmentsForPartialUpsertOrDedup; private OffsetCriteria _offsetCriteria; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org