Jackie-Jiang commented on code in PR #14811:
URL: https://github.com/apache/pinot/pull/14811#discussion_r1931299015


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -213,6 +222,7 @@ public 
PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
         controllerConf.getDeepStoreRetryUploadParallelism()) : null;
     _deepStoreUploadExecutorPendingSegments =
         _isDeepStoreLLCSegmentUploadRetryEnabled ? 
ConcurrentHashMap.newKeySet() : null;
+    _forceCommitExecutorService = Executors.newFixedThreadPool(4);

Review Comment:
   Having a fixed size pool could actually cause problems when there are 
multiple force commit request. Since it is waiting most of the time, I'd 
actually suggest making a single thread pool for each request same as the last 
version. It is not query path so the overhead should be fine.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, int batchSize) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, 
targetConsumingSegments, batchSize);
+
+    _forceCommitExecutorService.submit(() -> 
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit: segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit) {
+
+    try {
+      Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+    } catch (InterruptedException ignored) {
+    }
+
+    int attemptCount = 0;
+    final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+    try {
+      attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+        segmentsYetToBeCommitted[0] = 
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+        return segmentsYetToBeCommitted[0].isEmpty();
+      });
+    } catch (AttemptsExceededException | RetriableOperationException e) {
+      String errorMsg = String.format(
+          "Exception occurred while executing the forceCommit batch of 
segments: %s, attempt count: %d, "
+              + "segmentsYetToBeCommitted: %s",
+          segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> 
targetConsumingSegments,
+      int batchSize) {
+    Map<String, Queue<String>> instanceToConsumingSegments =
+        getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = new ArrayList<>();
+    Set<String> currentBatch = new HashSet<>();
+    Set<String> segmentsAdded = new HashSet<>();
+    boolean segmentsRemaining = true;
+
+    while (segmentsRemaining) {
+      segmentsRemaining = false;
+      // pick segments in round-robin fashion to parallelize
+      // forceCommit across max servers
+      for (Queue<String> queue : instanceToConsumingSegments.values()) {
+        if (!queue.isEmpty()) {
+          segmentsRemaining = true;
+          String segmentName = queue.poll();
+          // there might be a segment replica hosted on
+          // another instance added before
+          if (segmentsAdded.contains(segmentName)) {
+            continue;
+          }
+          currentBatch.add(segmentName);
+          segmentsAdded.add(segmentName);
+          if (currentBatch.size() == batchSize) {
+            segmentBatchList.add(currentBatch);
+            currentBatch = new HashSet<>();
+          }
+        }
+      }
+    }
+
+    if (!currentBatch.isEmpty()) {
+      segmentBatchList.add(currentBatch);
+    }
+    return segmentBatchList;
+  }
+
+  @VisibleForTesting
+  Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState 
idealState,
+      Set<String> targetConsumingSegments) {
+    Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();
+
+    Map<String, Map<String, String>> segmentNameToInstanceToStateMap = 
idealState.getRecord().getMapFields();
+    for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
+      if (!targetConsumingSegments.contains(segmentName)) {

Review Comment:
   Let's loop over `targetConsumingSegments` instead of ideal state. Ideal 
state should always contain `targetConsumingSegments` because they are 
extracted from ideal state.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -152,6 +157,9 @@ public class PinotLLCRealtimeSegmentManager {
 
   // Max time to wait for all LLC segments to complete committing their 
metadata while stopping the controller.
   private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 
30_000L;
+  private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000;

Review Comment:
   Let's take the check interval also from the rest API because different use 
case might want different interval; we might also want to add a TIMEOUT and 
also take that from rest API. The retry count can be calculated from timeout 
and interval.
   We can provide default values (e.g. 5s, 3min) for them in case they are not 
provided. IMO 15s interval is too long because it means for each batch we will 
wait at least 15s.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, int batchSize) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, 
targetConsumingSegments, batchSize);
+
+    _forceCommitExecutorService.submit(() -> 
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit: segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit) {
+
+    try {
+      Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+    } catch (InterruptedException ignored) {
+    }
+
+    int attemptCount = 0;
+    final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+    try {
+      attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+        segmentsYetToBeCommitted[0] = 
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+        return segmentsYetToBeCommitted[0].isEmpty();
+      });
+    } catch (AttemptsExceededException | RetriableOperationException e) {
+      String errorMsg = String.format(
+          "Exception occurred while executing the forceCommit batch of 
segments: %s, attempt count: %d, "
+              + "segmentsYetToBeCommitted: %s",
+          segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> 
targetConsumingSegments,
+      int batchSize) {
+    Map<String, Queue<String>> instanceToConsumingSegments =
+        getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = new ArrayList<>();
+    Set<String> currentBatch = new HashSet<>();
+    Set<String> segmentsAdded = new HashSet<>();
+    boolean segmentsRemaining = true;
+
+    while (segmentsRemaining) {
+      segmentsRemaining = false;
+      // pick segments in round-robin fashion to parallelize
+      // forceCommit across max servers
+      for (Queue<String> queue : instanceToConsumingSegments.values()) {
+        if (!queue.isEmpty()) {

Review Comment:
   We can remove the queue when it is empty to avoid checking it again and 
again. You may use iterator to remove entry without extra lookup



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, int batchSize) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, 
targetConsumingSegments, batchSize);
+
+    _forceCommitExecutorService.submit(() -> 
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit: segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit) {
+
+    try {
+      Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+    } catch (InterruptedException ignored) {
+    }
+
+    int attemptCount = 0;
+    final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+    try {
+      attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+        segmentsYetToBeCommitted[0] = 
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+        return segmentsYetToBeCommitted[0].isEmpty();
+      });
+    } catch (AttemptsExceededException | RetriableOperationException e) {
+      String errorMsg = String.format(
+          "Exception occurred while executing the forceCommit batch of 
segments: %s, attempt count: %d, "
+              + "segmentsYetToBeCommitted: %s",
+          segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> 
targetConsumingSegments,
+      int batchSize) {
+    Map<String, Queue<String>> instanceToConsumingSegments =
+        getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = new ArrayList<>();
+    Set<String> currentBatch = new HashSet<>();
+    Set<String> segmentsAdded = new HashSet<>();
+    boolean segmentsRemaining = true;
+
+    while (segmentsRemaining) {
+      segmentsRemaining = false;
+      // pick segments in round-robin fashion to parallelize

Review Comment:
   Smart!



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, int batchSize) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, 
targetConsumingSegments, batchSize);
+
+    _forceCommitExecutorService.submit(() -> 
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit: segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit) {
+
+    try {
+      Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+    } catch (InterruptedException ignored) {

Review Comment:
   Ignoring interrupt could be risky (holding a long running thread). Let's 
wrap it as a `RuntimeException` and throw it. We may log an error when catching 
it



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, int batchSize) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, 
targetConsumingSegments, batchSize);
+
+    _forceCommitExecutorService.submit(() -> 
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit: segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit) {
+
+    try {
+      Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+    } catch (InterruptedException ignored) {
+    }
+
+    int attemptCount = 0;
+    final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+    try {
+      attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+        segmentsYetToBeCommitted[0] = 
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+        return segmentsYetToBeCommitted[0].isEmpty();
+      });
+    } catch (AttemptsExceededException | RetriableOperationException e) {
+      String errorMsg = String.format(
+          "Exception occurred while executing the forceCommit batch of 
segments: %s, attempt count: %d, "
+              + "segmentsYetToBeCommitted: %s",
+          segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> 
targetConsumingSegments,
+      int batchSize) {
+    Map<String, Queue<String>> instanceToConsumingSegments =
+        getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = new ArrayList<>();
+    Set<String> currentBatch = new HashSet<>();
+    Set<String> segmentsAdded = new HashSet<>();
+    boolean segmentsRemaining = true;
+
+    while (segmentsRemaining) {
+      segmentsRemaining = false;
+      // pick segments in round-robin fashion to parallelize
+      // forceCommit across max servers
+      for (Queue<String> queue : instanceToConsumingSegments.values()) {
+        if (!queue.isEmpty()) {
+          segmentsRemaining = true;
+          String segmentName = queue.poll();
+          // there might be a segment replica hosted on
+          // another instance added before
+          if (segmentsAdded.contains(segmentName)) {
+            continue;
+          }
+          currentBatch.add(segmentName);
+          segmentsAdded.add(segmentName);
+          if (currentBatch.size() == batchSize) {
+            segmentBatchList.add(currentBatch);
+            currentBatch = new HashSet<>();
+          }
+        }
+      }
+    }
+
+    if (!currentBatch.isEmpty()) {
+      segmentBatchList.add(currentBatch);
+    }
+    return segmentBatchList;
+  }
+
+  @VisibleForTesting
+  Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState 
idealState,
+      Set<String> targetConsumingSegments) {
+    Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();
+
+    Map<String, Map<String, String>> segmentNameToInstanceToStateMap = 
idealState.getRecord().getMapFields();
+    for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
+      if (!targetConsumingSegments.contains(segmentName)) {
+        continue;
+      }
+      Map<String, String> instanceToStateMap = 
segmentNameToInstanceToStateMap.get(segmentName);
+      for (String instance : instanceToStateMap.keySet()) {
+        String state = instanceToStateMap.get(instance);
+        if (state.equals(SegmentStateModel.CONSUMING)) {
+          instanceToConsumingSegments.compute(instance, (key, value) -> {
+            if (value == null) {
+              value = new LinkedList<>();
+            }
+            value.add(segmentName);
+            return value;
+          });

Review Comment:
   ```suggestion
             instanceToConsumingSegments.computeIfAbsent(instance, k -> new 
LinkedList<>()).add(segmentName);
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, int batchSize) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, 
targetConsumingSegments, batchSize);
+
+    _forceCommitExecutorService.submit(() -> 
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit: segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit) {
+
+    try {
+      Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+    } catch (InterruptedException ignored) {
+    }
+
+    int attemptCount = 0;
+    final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+    try {
+      attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+        segmentsYetToBeCommitted[0] = 
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+        return segmentsYetToBeCommitted[0].isEmpty();
+      });
+    } catch (AttemptsExceededException | RetriableOperationException e) {
+      String errorMsg = String.format(
+          "Exception occurred while executing the forceCommit batch of 
segments: %s, attempt count: %d, "
+              + "segmentsYetToBeCommitted: %s",
+          segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> 
targetConsumingSegments,
+      int batchSize) {
+    Map<String, Queue<String>> instanceToConsumingSegments =
+        getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = new ArrayList<>();
+    Set<String> currentBatch = new HashSet<>();
+    Set<String> segmentsAdded = new HashSet<>();
+    boolean segmentsRemaining = true;
+
+    while (segmentsRemaining) {
+      segmentsRemaining = false;
+      // pick segments in round-robin fashion to parallelize
+      // forceCommit across max servers
+      for (Queue<String> queue : instanceToConsumingSegments.values()) {
+        if (!queue.isEmpty()) {
+          segmentsRemaining = true;
+          String segmentName = queue.poll();
+          // there might be a segment replica hosted on
+          // another instance added before
+          if (segmentsAdded.contains(segmentName)) {

Review Comment:
   We can reduce a lookup by
   ```suggestion
             if (!segmentsAdded.add(segmentName)) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, int batchSize) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, 
targetConsumingSegments, batchSize);
+
+    _forceCommitExecutorService.submit(() -> 
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit: segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit) {
+
+    try {
+      Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+    } catch (InterruptedException ignored) {
+    }
+
+    int attemptCount = 0;
+    final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};

Review Comment:
   ```suggestion
       final Set<String>[] segmentsYetToBeCommitted = new Set[1];
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, 
Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, int batchSize) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, 
targetConsumingSegments, batchSize);
+
+    _forceCommitExecutorService.submit(() -> 
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit: segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, 
Set<String> segmentBatchToCommit) {
+
+    try {
+      Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+    } catch (InterruptedException ignored) {
+    }
+
+    int attemptCount = 0;
+    final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+    try {
+      attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+        segmentsYetToBeCommitted[0] = 
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+        return segmentsYetToBeCommitted[0].isEmpty();
+      });
+    } catch (AttemptsExceededException | RetriableOperationException e) {
+      String errorMsg = String.format(
+          "Exception occurred while executing the forceCommit batch of 
segments: %s, attempt count: %d, "
+              + "segmentsYetToBeCommitted: %s",
+          segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> 
targetConsumingSegments,
+      int batchSize) {
+    Map<String, Queue<String>> instanceToConsumingSegments =
+        getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+    List<Set<String>> segmentBatchList = new ArrayList<>();
+    Set<String> currentBatch = new HashSet<>();
+    Set<String> segmentsAdded = new HashSet<>();
+    boolean segmentsRemaining = true;
+
+    while (segmentsRemaining) {
+      segmentsRemaining = false;
+      // pick segments in round-robin fashion to parallelize
+      // forceCommit across max servers
+      for (Queue<String> queue : instanceToConsumingSegments.values()) {
+        if (!queue.isEmpty()) {
+          segmentsRemaining = true;
+          String segmentName = queue.poll();
+          // there might be a segment replica hosted on
+          // another instance added before
+          if (segmentsAdded.contains(segmentName)) {
+            continue;
+          }
+          currentBatch.add(segmentName);
+          segmentsAdded.add(segmentName);
+          if (currentBatch.size() == batchSize) {
+            segmentBatchList.add(currentBatch);
+            currentBatch = new HashSet<>();
+          }
+        }
+      }
+    }
+
+    if (!currentBatch.isEmpty()) {
+      segmentBatchList.add(currentBatch);
+    }
+    return segmentBatchList;
+  }
+
+  @VisibleForTesting
+  Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState 
idealState,
+      Set<String> targetConsumingSegments) {
+    Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();
+
+    Map<String, Map<String, String>> segmentNameToInstanceToStateMap = 
idealState.getRecord().getMapFields();
+    for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
+      if (!targetConsumingSegments.contains(segmentName)) {
+        continue;
+      }
+      Map<String, String> instanceToStateMap = 
segmentNameToInstanceToStateMap.get(segmentName);
+      for (String instance : instanceToStateMap.keySet()) {
+        String state = instanceToStateMap.get(instance);

Review Comment:
   Use `entrySet()` to reduce lookup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to