sajjad-moradi commented on code in PR #14811:
URL: https://github.com/apache/pinot/pull/14811#discussion_r1935978585


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,126 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, ForceCommitBatchConfig 
forceCommitBatchConfig) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList =
+        getSegmentBatchList(idealState, targetConsumingSegments, 
forceCommitBatchConfig.getBatchSize());
+
+    _forceCommitExecutorService.submit(
+        () -> processBatchesSequentially(segmentBatchList, tableNameWithType, 
forceCommitBatchConfig));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType,
+      ForceCommitBatchConfig forceCommitBatchConfig) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit : segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch, 
forceCommitBatchConfig);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit,
+      ForceCommitBatchConfig forceCommitBatchConfig) {
+    int batchStatusCheckIntervalMs = 
forceCommitBatchConfig.getBatchStatusCheckIntervalMs();
+    int batchStatusCheckTimeoutMs = 
forceCommitBatchConfig.getBatchStatusCheckTimeoutMs();
+
+    try {
+      Thread.sleep(batchStatusCheckIntervalMs);
+    } catch (InterruptedException e) {
+      LOGGER.error("Exception occurred while waiting for the forceCommit of 
segments: {}", segmentBatchToCommit, e);
+      throw new RuntimeException(e);
+    }
+
+    int maxAttempts = (batchStatusCheckTimeoutMs + batchStatusCheckIntervalMs 
- 1) / batchStatusCheckIntervalMs;
+    RetryPolicy retryPolicy =
+        RetryPolicies.fixedDelayRetryPolicy(maxAttempts, 
batchStatusCheckIntervalMs);
+    int attemptCount = 0;
+    final Set<String>[] segmentsYetToBeCommitted = new Set[1];
+
+    try {
+      attemptCount = retryPolicy.attempt(() -> {
+        segmentsYetToBeCommitted[0] = 
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+        return segmentsYetToBeCommitted[0].isEmpty();
+      });
+    } catch (AttemptsExceededException | RetriableOperationException e) {
+      String errorMsg = String.format(
+          "Exception occurred while waiting for the forceCommit of segments: 
%s, attempt count: %d, "
+              + "segmentsYetToBeCommitted: %s",
+          segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(e);

Review Comment:
   If an exception is thrown, there's no need to log. Add the errorMsg to the 
runtime exception.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,126 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, ForceCommitBatchConfig 
forceCommitBatchConfig) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList =
+        getSegmentBatchList(idealState, targetConsumingSegments, 
forceCommitBatchConfig.getBatchSize());
+
+    _forceCommitExecutorService.submit(
+        () -> processBatchesSequentially(segmentBatchList, tableNameWithType, 
forceCommitBatchConfig));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType,
+      ForceCommitBatchConfig forceCommitBatchConfig) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit : segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch, 
forceCommitBatchConfig);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit,
+      ForceCommitBatchConfig forceCommitBatchConfig) {
+    int batchStatusCheckIntervalMs = 
forceCommitBatchConfig.getBatchStatusCheckIntervalMs();
+    int batchStatusCheckTimeoutMs = 
forceCommitBatchConfig.getBatchStatusCheckTimeoutMs();
+
+    try {
+      Thread.sleep(batchStatusCheckIntervalMs);
+    } catch (InterruptedException e) {
+      LOGGER.error("Exception occurred while waiting for the forceCommit of 
segments: {}", segmentBatchToCommit, e);
+      throw new RuntimeException(e);
+    }
+
+    int maxAttempts = (batchStatusCheckTimeoutMs + batchStatusCheckIntervalMs 
- 1) / batchStatusCheckIntervalMs;
+    RetryPolicy retryPolicy =
+        RetryPolicies.fixedDelayRetryPolicy(maxAttempts, 
batchStatusCheckIntervalMs);
+    int attemptCount = 0;

Review Comment:
   No need for this variable. Both AttemptsExceededException and 
RetriableOperationException have getAttempts method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to