This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2de61de8bc Remove `recreateDeletedConsumingSegment` flag from 
RealtimeSegmentValidationManager (#14024)
2de61de8bc is described below

commit 2de61de8bcf4c5befce3404543f25025cbbf7cbd
Author: Shounak kulkarni <shounakmk...@gmail.com>
AuthorDate: Thu Sep 19 18:44:55 2024 +0530

    Remove `recreateDeletedConsumingSegment` flag from 
RealtimeSegmentValidationManager (#14024)
    
    * Remove recreateDeletedConsumingSegment flag
    
    In favour of always recreating deleted consuming segments if table is not 
paused.
    
    * handle resumption upon storage quota getting freed up
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 14 ++---
 .../RealtimeSegmentValidationManager.java          | 63 ++++++++++------------
 .../PinotLLCRealtimeSegmentManagerTest.java        |  5 +-
 3 files changed, 35 insertions(+), 47 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d799000ed3..7a459d7ddb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -912,11 +912,9 @@ public class PinotLLCRealtimeSegmentManager {
    * Check whether there are segments in the PROPERTYSTORE with status DONE, 
but no new segment in status
    * IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is 
ONLINE.
    * If so, it should create a new CONSUMING segment for the partition.
-   * (this operation is done only if @param recreateDeletedConsumingSegment is 
set to true,
-   * which means it's manually triggered by admin not by automatic periodic 
task)
    */
   public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
StreamConfig streamConfig,
-      boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
+      OffsetCriteria offsetCriteria) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
@@ -938,7 +936,7 @@ public class PinotLLCRealtimeSegmentManager {
             getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
         streamConfig.setOffsetCriteria(originalOffsetCriteria);
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, 
idealState, newPartitionGroupMetadataList,
-            recreateDeletedConsumingSegment, offsetCriteria);
+            offsetCriteria);
       } else {
         LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
             realtimeTableName, isTableEnabled, isTablePaused);
@@ -1158,8 +1156,7 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, 
StreamConfig streamConfig, IdealState idealState,
-      List<PartitionGroupMetadata> newPartitionGroupMetadataList, boolean 
recreateDeletedConsumingSegment,
-      OffsetCriteria offsetCriteria) {
+      List<PartitionGroupMetadata> newPartitionGroupMetadataList, 
OffsetCriteria offsetCriteria) {
     String realtimeTableName = tableConfig.getTableName();
 
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
@@ -1275,7 +1272,7 @@ public class PinotLLCRealtimeSegmentManager {
                 instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
-              if (recreateDeletedConsumingSegment && 
latestSegmentZKMetadata.getStatus().isCompleted()
+              if (latestSegmentZKMetadata.getStatus().isCompleted()
                   && isAllInstancesInState(instanceStateMap, 
SegmentStateModel.ONLINE)) {
                 // If we get here, that means in IdealState, the latest 
segment has all replicas ONLINE.
                 // Create a new IN_PROGRESS segment in PROPERTYSTORE,
@@ -1737,7 +1734,6 @@ public class PinotLLCRealtimeSegmentManager {
 
     // trigger realtime segment validation job to resume consumption
     Map<String, String> taskProperties = new HashMap<>();
-    
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY,
 "true");
     if (offsetCriteria != null) {
       taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, 
offsetCriteria);
     }
@@ -1749,7 +1745,7 @@ public class PinotLLCRealtimeSegmentManager {
             + "endpoint in a few moments to double check.", new 
Timestamp(System.currentTimeMillis()).toString());
   }
 
-  private IdealState updatePauseStateInIdealState(String tableNameWithType, 
boolean pause,
+  public IdealState updatePauseStateInIdealState(String tableNameWithType, 
boolean pause,
       PauseState.ReasonCode reasonCode, @Nullable String comment) {
     PauseState pauseState = new PauseState(pause, reasonCode, comment,
         new Timestamp(System.currentTimeMillis()).toString());
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 856d88c226..b8460a406a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -58,7 +58,6 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   private final int _segmentLevelValidationIntervalInSeconds;
   private long _lastSegmentLevelValidationRunTimeMs = 0L;
 
-  public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = 
"recreateDeletedConsumingSegment";
   public static final String OFFSET_CRITERIA = "offsetCriteria";
 
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
@@ -87,8 +86,6 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       context._runSegmentLevelValidation = true;
       _lastSegmentLevelValidationRunTimeMs = currentTimeMs;
     }
-    context._recreateDeletedConsumingSegment =
-        
Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY));
     String offsetCriteriaStr = 
periodicTaskProperties.getProperty(OFFSET_CRITERIA);
     if (offsetCriteriaStr != null) {
       context._offsetCriteria = new 
OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr);
@@ -113,44 +110,41 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       runSegmentLevelValidation(tableConfig, streamConfig);
     }
 
-    if (shouldEnsureConsuming(tableNameWithType, context)) {
-      _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig,
-          context._recreateDeletedConsumingSegment, context._offsetCriteria);
+    if (shouldEnsureConsuming(tableNameWithType)) {
+      _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig, context._offsetCriteria);
     }
   }
 
-  private boolean shouldEnsureConsuming(String tableNameWithType, Context 
context) {
-    // Keeps the table paused/unpaused based pause validations.
-    // Skips updating the pause state if table is paused by admin
-    PauseState pauseState = computePauseState(tableNameWithType);
-    if (!pauseState.isPaused()) {
-      boolean unPausedUponStorageWithinQuota =
-        
pauseState.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED);
-      if (unPausedUponStorageWithinQuota) {
-        // recreate consuming segments if table is resumed upon the table 
storage getting within quota limit
-        context._recreateDeletedConsumingSegment = true;
-      }
-    }
-    return !pauseState.isPaused();
-  }
-
-  private PauseState computePauseState(String tableNameWithType) {
+  /**
+   *
+   * Updates the table paused state based on pause validations (e.g. storage 
quota being exceeded).
+   * Skips updating the pause state if table is paused by admin.
+   * Returns true if table is not paused
+   */
+  private boolean shouldEnsureConsuming(String tableNameWithType) {
     PauseStatusDetails pauseStatus = 
_llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType);
     boolean isTablePaused = pauseStatus.getPauseFlag();
     // if table is paused by admin then don't compute
-    if (!isTablePaused || 
pauseStatus.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED))
 {
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      boolean isQuotaExceeded = 
_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
-      // if quota breach and pause flag is not in sync, update the IS
-      if (isQuotaExceeded != isTablePaused) {
-        String storageQuota = tableConfig.getQuotaConfig() != null ? 
tableConfig.getQuotaConfig().getStorage() : "NA";
-        pauseStatus = 
_llcRealtimeSegmentManager.pauseConsumption(tableNameWithType,
-            PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
-            isQuotaExceeded ? "Storage quota of " + storageQuota + " 
exceeded." : "Table storage within quota limits");
-      }
+    if (isTablePaused && 
pauseStatus.getReasonCode().equals(PauseState.ReasonCode.ADMINISTRATIVE)) {
+      return false;
+    }
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    boolean isQuotaExceeded = 
_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
+    if (isQuotaExceeded == isTablePaused) {
+      return !isTablePaused;
+    }
+    // if quota breach and pause flag is not in sync, update the IS
+    if (isQuotaExceeded) {
+      String storageQuota = tableConfig.getQuotaConfig() != null ? 
tableConfig.getQuotaConfig().getStorage() : "NA";
+      // as quota is breached pause the consumption right away
+      _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType, 
PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
+          "Storage quota of " + storageQuota + " exceeded.");
+    } else {
+      // as quota limit is being honored, unset the pause state and allow 
consuming segment recreation.
+      
_llcRealtimeSegmentManager.updatePauseStateInIdealState(tableNameWithType, 
false,
+          PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, "Table storage within 
quota limits");
     }
-    return new PauseState(pauseStatus.getPauseFlag(), 
pauseStatus.getReasonCode(), pauseStatus.getComment(),
-        pauseStatus.getTimestamp());
+    return !isQuotaExceeded;
   }
 
   private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig 
streamConfig) {
@@ -204,7 +198,6 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
   public static final class Context {
     private boolean _runSegmentLevelValidation;
-    private boolean _recreateDeletedConsumingSegment;
     private OffsetCriteria _offsetCriteria;
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 7127294fee..435303f5e9 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -878,8 +878,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       // Expected
     }
     try {
-      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, 
segmentManager._streamConfig, false,
-          null);
+      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, 
segmentManager._streamConfig, null);
       fail();
     } catch (IllegalStateException e) {
       // Expected
@@ -1146,7 +1145,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     public void ensureAllPartitionsConsuming() {
       ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
-          getNewPartitionGroupMetadataList(_streamConfig, 
Collections.emptyList()), false, null);
+          getNewPartitionGroupMetadataList(_streamConfig, 
Collections.emptyList()), null);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to