This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2de61de8bc Remove `recreateDeletedConsumingSegment` flag from RealtimeSegmentValidationManager (#14024) 2de61de8bc is described below commit 2de61de8bcf4c5befce3404543f25025cbbf7cbd Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Thu Sep 19 18:44:55 2024 +0530 Remove `recreateDeletedConsumingSegment` flag from RealtimeSegmentValidationManager (#14024) * Remove recreateDeletedConsumingSegment flag In favour of always recreating deleted consuming segments if table is not paused. * handle resumption upon storage quota getting freed up --- .../realtime/PinotLLCRealtimeSegmentManager.java | 14 ++--- .../RealtimeSegmentValidationManager.java | 63 ++++++++++------------ .../PinotLLCRealtimeSegmentManagerTest.java | 5 +- 3 files changed, 35 insertions(+), 47 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index d799000ed3..7a459d7ddb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -912,11 +912,9 @@ public class PinotLLCRealtimeSegmentManager { * Check whether there are segments in the PROPERTYSTORE with status DONE, but no new segment in status * IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is ONLINE. * If so, it should create a new CONSUMING segment for the partition. - * (this operation is done only if @param recreateDeletedConsumingSegment is set to true, - * which means it's manually triggered by admin not by automatic periodic task) */ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, - boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) { + OffsetCriteria offsetCriteria) { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); @@ -938,7 +936,7 @@ public class PinotLLCRealtimeSegmentManager { getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); streamConfig.setOffsetCriteria(originalOffsetCriteria); return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, - recreateDeletedConsumingSegment, offsetCriteria); + offsetCriteria); } else { LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", realtimeTableName, isTableEnabled, isTablePaused); @@ -1158,8 +1156,7 @@ public class PinotLLCRealtimeSegmentManager { */ @VisibleForTesting IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, IdealState idealState, - List<PartitionGroupMetadata> newPartitionGroupMetadataList, boolean recreateDeletedConsumingSegment, - OffsetCriteria offsetCriteria) { + List<PartitionGroupMetadata> newPartitionGroupMetadataList, OffsetCriteria offsetCriteria) { String realtimeTableName = tableConfig.getTableName(); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); @@ -1275,7 +1272,7 @@ public class PinotLLCRealtimeSegmentManager { instancePartitionsMap, startOffset); } else { if (newPartitionGroupSet.contains(partitionGroupId)) { - if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted() + if (latestSegmentZKMetadata.getStatus().isCompleted() && isAllInstancesInState(instanceStateMap, SegmentStateModel.ONLINE)) { // If we get here, that means in IdealState, the latest segment has all replicas ONLINE. // Create a new IN_PROGRESS segment in PROPERTYSTORE, @@ -1737,7 +1734,6 @@ public class PinotLLCRealtimeSegmentManager { // trigger realtime segment validation job to resume consumption Map<String, String> taskProperties = new HashMap<>(); - taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true"); if (offsetCriteria != null) { taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, offsetCriteria); } @@ -1749,7 +1745,7 @@ public class PinotLLCRealtimeSegmentManager { + "endpoint in a few moments to double check.", new Timestamp(System.currentTimeMillis()).toString()); } - private IdealState updatePauseStateInIdealState(String tableNameWithType, boolean pause, + public IdealState updatePauseStateInIdealState(String tableNameWithType, boolean pause, PauseState.ReasonCode reasonCode, @Nullable String comment) { PauseState pauseState = new PauseState(pause, reasonCode, comment, new Timestamp(System.currentTimeMillis()).toString()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 856d88c226..b8460a406a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -58,7 +58,6 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea private final int _segmentLevelValidationIntervalInSeconds; private long _lastSegmentLevelValidationRunTimeMs = 0L; - public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = "recreateDeletedConsumingSegment"; public static final String OFFSET_CRITERIA = "offsetCriteria"; public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, @@ -87,8 +86,6 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea context._runSegmentLevelValidation = true; _lastSegmentLevelValidationRunTimeMs = currentTimeMs; } - context._recreateDeletedConsumingSegment = - Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY)); String offsetCriteriaStr = periodicTaskProperties.getProperty(OFFSET_CRITERIA); if (offsetCriteriaStr != null) { context._offsetCriteria = new OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr); @@ -113,44 +110,41 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea runSegmentLevelValidation(tableConfig, streamConfig); } - if (shouldEnsureConsuming(tableNameWithType, context)) { - _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig, - context._recreateDeletedConsumingSegment, context._offsetCriteria); + if (shouldEnsureConsuming(tableNameWithType)) { + _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig, context._offsetCriteria); } } - private boolean shouldEnsureConsuming(String tableNameWithType, Context context) { - // Keeps the table paused/unpaused based pause validations. - // Skips updating the pause state if table is paused by admin - PauseState pauseState = computePauseState(tableNameWithType); - if (!pauseState.isPaused()) { - boolean unPausedUponStorageWithinQuota = - pauseState.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED); - if (unPausedUponStorageWithinQuota) { - // recreate consuming segments if table is resumed upon the table storage getting within quota limit - context._recreateDeletedConsumingSegment = true; - } - } - return !pauseState.isPaused(); - } - - private PauseState computePauseState(String tableNameWithType) { + /** + * + * Updates the table paused state based on pause validations (e.g. storage quota being exceeded). + * Skips updating the pause state if table is paused by admin. + * Returns true if table is not paused + */ + private boolean shouldEnsureConsuming(String tableNameWithType) { PauseStatusDetails pauseStatus = _llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType); boolean isTablePaused = pauseStatus.getPauseFlag(); // if table is paused by admin then don't compute - if (!isTablePaused || pauseStatus.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED)) { - TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); - boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig); - // if quota breach and pause flag is not in sync, update the IS - if (isQuotaExceeded != isTablePaused) { - String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA"; - pauseStatus = _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType, - PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, - isQuotaExceeded ? "Storage quota of " + storageQuota + " exceeded." : "Table storage within quota limits"); - } + if (isTablePaused && pauseStatus.getReasonCode().equals(PauseState.ReasonCode.ADMINISTRATIVE)) { + return false; + } + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig); + if (isQuotaExceeded == isTablePaused) { + return !isTablePaused; + } + // if quota breach and pause flag is not in sync, update the IS + if (isQuotaExceeded) { + String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA"; + // as quota is breached pause the consumption right away + _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, + "Storage quota of " + storageQuota + " exceeded."); + } else { + // as quota limit is being honored, unset the pause state and allow consuming segment recreation. + _llcRealtimeSegmentManager.updatePauseStateInIdealState(tableNameWithType, false, + PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, "Table storage within quota limits"); } - return new PauseState(pauseStatus.getPauseFlag(), pauseStatus.getReasonCode(), pauseStatus.getComment(), - pauseStatus.getTimestamp()); + return !isQuotaExceeded; } private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) { @@ -204,7 +198,6 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea public static final class Context { private boolean _runSegmentLevelValidation; - private boolean _recreateDeletedConsumingSegment; private OffsetCriteria _offsetCriteria; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 7127294fee..435303f5e9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -878,8 +878,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // Expected } try { - segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false, - null); + segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, null); fail(); } catch (IllegalStateException e) { // Expected @@ -1146,7 +1145,7 @@ public class PinotLLCRealtimeSegmentManagerTest { public void ensureAllPartitionsConsuming() { ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, - getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false, null); + getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), null); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org