Jackie-Jiang commented on code in PR #14811: URL: https://github.com/apache/pinot/pull/14811#discussion_r1931299015
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -213,6 +222,7 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan controllerConf.getDeepStoreRetryUploadParallelism()) : null; _deepStoreUploadExecutorPendingSegments = _isDeepStoreLLCSegmentUploadRetryEnabled ? ConcurrentHashMap.newKeySet() : null; + _forceCommitExecutorService = Executors.newFixedThreadPool(4); Review Comment: Having a fixed size pool could actually cause problems when there are multiple force commit request. Since it is waiting most of the time, I'd actually suggest making a single thread pool for each request same as the last version. It is not query path so the overhead should be fine. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, int batchSize) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); + + _forceCommitExecutorService.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit: segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit) { + + try { + Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); + } catch (InterruptedException ignored) { + } + + int attemptCount = 0; + final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; + try { + attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> { + segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); + return segmentsYetToBeCommitted[0].isEmpty(); + }); + } catch (AttemptsExceededException | RetriableOperationException e) { + String errorMsg = String.format( + "Exception occurred while executing the forceCommit batch of segments: %s, attempt count: %d, " + + "segmentsYetToBeCommitted: %s", + segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]); + LOGGER.error(errorMsg, e); + throw new RuntimeException(e); + } + } + + @VisibleForTesting + List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments, + int batchSize) { + Map<String, Queue<String>> instanceToConsumingSegments = + getInstanceToConsumingSegments(idealState, targetConsumingSegments); + + List<Set<String>> segmentBatchList = new ArrayList<>(); + Set<String> currentBatch = new HashSet<>(); + Set<String> segmentsAdded = new HashSet<>(); + boolean segmentsRemaining = true; + + while (segmentsRemaining) { + segmentsRemaining = false; + // pick segments in round-robin fashion to parallelize + // forceCommit across max servers + for (Queue<String> queue : instanceToConsumingSegments.values()) { + if (!queue.isEmpty()) { + segmentsRemaining = true; + String segmentName = queue.poll(); + // there might be a segment replica hosted on + // another instance added before + if (segmentsAdded.contains(segmentName)) { + continue; + } + currentBatch.add(segmentName); + segmentsAdded.add(segmentName); + if (currentBatch.size() == batchSize) { + segmentBatchList.add(currentBatch); + currentBatch = new HashSet<>(); + } + } + } + } + + if (!currentBatch.isEmpty()) { + segmentBatchList.add(currentBatch); + } + return segmentBatchList; + } + + @VisibleForTesting + Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState, + Set<String> targetConsumingSegments) { + Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>(); + + Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields(); + for (String segmentName : segmentNameToInstanceToStateMap.keySet()) { + if (!targetConsumingSegments.contains(segmentName)) { Review Comment: Let's loop over `targetConsumingSegments` instead of ideal state. Ideal state should always contain `targetConsumingSegments` because they are extracted from ideal state. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -152,6 +157,9 @@ public class PinotLLCRealtimeSegmentManager { // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller. private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L; + private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000; Review Comment: Let's take the check interval also from the rest API because different use case might want different interval; we might also want to add a TIMEOUT and also take that from rest API. The retry count can be calculated from timeout and interval. We can provide default values (e.g. 5s, 3min) for them in case they are not provided. IMO 15s interval is too long because it means for each batch we will wait at least 15s. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, int batchSize) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); + + _forceCommitExecutorService.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit: segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit) { + + try { + Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); + } catch (InterruptedException ignored) { + } + + int attemptCount = 0; + final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; + try { + attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> { + segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); + return segmentsYetToBeCommitted[0].isEmpty(); + }); + } catch (AttemptsExceededException | RetriableOperationException e) { + String errorMsg = String.format( + "Exception occurred while executing the forceCommit batch of segments: %s, attempt count: %d, " + + "segmentsYetToBeCommitted: %s", + segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]); + LOGGER.error(errorMsg, e); + throw new RuntimeException(e); + } + } + + @VisibleForTesting + List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments, + int batchSize) { + Map<String, Queue<String>> instanceToConsumingSegments = + getInstanceToConsumingSegments(idealState, targetConsumingSegments); + + List<Set<String>> segmentBatchList = new ArrayList<>(); + Set<String> currentBatch = new HashSet<>(); + Set<String> segmentsAdded = new HashSet<>(); + boolean segmentsRemaining = true; + + while (segmentsRemaining) { + segmentsRemaining = false; + // pick segments in round-robin fashion to parallelize + // forceCommit across max servers + for (Queue<String> queue : instanceToConsumingSegments.values()) { + if (!queue.isEmpty()) { Review Comment: We can remove the queue when it is empty to avoid checking it again and again. You may use iterator to remove entry without extra lookup ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, int batchSize) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); + + _forceCommitExecutorService.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit: segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit) { + + try { + Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); + } catch (InterruptedException ignored) { + } + + int attemptCount = 0; + final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; + try { + attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> { + segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); + return segmentsYetToBeCommitted[0].isEmpty(); + }); + } catch (AttemptsExceededException | RetriableOperationException e) { + String errorMsg = String.format( + "Exception occurred while executing the forceCommit batch of segments: %s, attempt count: %d, " + + "segmentsYetToBeCommitted: %s", + segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]); + LOGGER.error(errorMsg, e); + throw new RuntimeException(e); + } + } + + @VisibleForTesting + List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments, + int batchSize) { + Map<String, Queue<String>> instanceToConsumingSegments = + getInstanceToConsumingSegments(idealState, targetConsumingSegments); + + List<Set<String>> segmentBatchList = new ArrayList<>(); + Set<String> currentBatch = new HashSet<>(); + Set<String> segmentsAdded = new HashSet<>(); + boolean segmentsRemaining = true; + + while (segmentsRemaining) { + segmentsRemaining = false; + // pick segments in round-robin fashion to parallelize Review Comment: Smart! ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, int batchSize) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); + + _forceCommitExecutorService.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit: segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit) { + + try { + Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); + } catch (InterruptedException ignored) { Review Comment: Ignoring interrupt could be risky (holding a long running thread). Let's wrap it as a `RuntimeException` and throw it. We may log an error when catching it ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, int batchSize) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); + + _forceCommitExecutorService.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit: segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit) { + + try { + Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); + } catch (InterruptedException ignored) { + } + + int attemptCount = 0; + final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; + try { + attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> { + segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); + return segmentsYetToBeCommitted[0].isEmpty(); + }); + } catch (AttemptsExceededException | RetriableOperationException e) { + String errorMsg = String.format( + "Exception occurred while executing the forceCommit batch of segments: %s, attempt count: %d, " + + "segmentsYetToBeCommitted: %s", + segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]); + LOGGER.error(errorMsg, e); + throw new RuntimeException(e); + } + } + + @VisibleForTesting + List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments, + int batchSize) { + Map<String, Queue<String>> instanceToConsumingSegments = + getInstanceToConsumingSegments(idealState, targetConsumingSegments); + + List<Set<String>> segmentBatchList = new ArrayList<>(); + Set<String> currentBatch = new HashSet<>(); + Set<String> segmentsAdded = new HashSet<>(); + boolean segmentsRemaining = true; + + while (segmentsRemaining) { + segmentsRemaining = false; + // pick segments in round-robin fashion to parallelize + // forceCommit across max servers + for (Queue<String> queue : instanceToConsumingSegments.values()) { + if (!queue.isEmpty()) { + segmentsRemaining = true; + String segmentName = queue.poll(); + // there might be a segment replica hosted on + // another instance added before + if (segmentsAdded.contains(segmentName)) { + continue; + } + currentBatch.add(segmentName); + segmentsAdded.add(segmentName); + if (currentBatch.size() == batchSize) { + segmentBatchList.add(currentBatch); + currentBatch = new HashSet<>(); + } + } + } + } + + if (!currentBatch.isEmpty()) { + segmentBatchList.add(currentBatch); + } + return segmentBatchList; + } + + @VisibleForTesting + Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState, + Set<String> targetConsumingSegments) { + Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>(); + + Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields(); + for (String segmentName : segmentNameToInstanceToStateMap.keySet()) { + if (!targetConsumingSegments.contains(segmentName)) { + continue; + } + Map<String, String> instanceToStateMap = segmentNameToInstanceToStateMap.get(segmentName); + for (String instance : instanceToStateMap.keySet()) { + String state = instanceToStateMap.get(instance); + if (state.equals(SegmentStateModel.CONSUMING)) { + instanceToConsumingSegments.compute(instance, (key, value) -> { + if (value == null) { + value = new LinkedList<>(); + } + value.add(segmentName); + return value; + }); Review Comment: ```suggestion instanceToConsumingSegments.computeIfAbsent(instance, k -> new LinkedList<>()).add(segmentName); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, int batchSize) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); + + _forceCommitExecutorService.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit: segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit) { + + try { + Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); + } catch (InterruptedException ignored) { + } + + int attemptCount = 0; + final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; + try { + attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> { + segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); + return segmentsYetToBeCommitted[0].isEmpty(); + }); + } catch (AttemptsExceededException | RetriableOperationException e) { + String errorMsg = String.format( + "Exception occurred while executing the forceCommit batch of segments: %s, attempt count: %d, " + + "segmentsYetToBeCommitted: %s", + segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]); + LOGGER.error(errorMsg, e); + throw new RuntimeException(e); + } + } + + @VisibleForTesting + List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments, + int batchSize) { + Map<String, Queue<String>> instanceToConsumingSegments = + getInstanceToConsumingSegments(idealState, targetConsumingSegments); + + List<Set<String>> segmentBatchList = new ArrayList<>(); + Set<String> currentBatch = new HashSet<>(); + Set<String> segmentsAdded = new HashSet<>(); + boolean segmentsRemaining = true; + + while (segmentsRemaining) { + segmentsRemaining = false; + // pick segments in round-robin fashion to parallelize + // forceCommit across max servers + for (Queue<String> queue : instanceToConsumingSegments.values()) { + if (!queue.isEmpty()) { + segmentsRemaining = true; + String segmentName = queue.poll(); + // there might be a segment replica hosted on + // another instance added before + if (segmentsAdded.contains(segmentName)) { Review Comment: We can reduce a lookup by ```suggestion if (!segmentsAdded.add(segmentName)) { ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, int batchSize) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); + + _forceCommitExecutorService.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit: segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit) { + + try { + Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); + } catch (InterruptedException ignored) { + } + + int attemptCount = 0; + final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; Review Comment: ```suggestion final Set<String>[] segmentsYetToBeCommitted = new Set[1]; ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, int batchSize) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); + + _forceCommitExecutorService.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit: segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit) { + + try { + Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); + } catch (InterruptedException ignored) { + } + + int attemptCount = 0; + final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; + try { + attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> { + segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); + return segmentsYetToBeCommitted[0].isEmpty(); + }); + } catch (AttemptsExceededException | RetriableOperationException e) { + String errorMsg = String.format( + "Exception occurred while executing the forceCommit batch of segments: %s, attempt count: %d, " + + "segmentsYetToBeCommitted: %s", + segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]); + LOGGER.error(errorMsg, e); + throw new RuntimeException(e); + } + } + + @VisibleForTesting + List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments, + int batchSize) { + Map<String, Queue<String>> instanceToConsumingSegments = + getInstanceToConsumingSegments(idealState, targetConsumingSegments); + + List<Set<String>> segmentBatchList = new ArrayList<>(); + Set<String> currentBatch = new HashSet<>(); + Set<String> segmentsAdded = new HashSet<>(); + boolean segmentsRemaining = true; + + while (segmentsRemaining) { + segmentsRemaining = false; + // pick segments in round-robin fashion to parallelize + // forceCommit across max servers + for (Queue<String> queue : instanceToConsumingSegments.values()) { + if (!queue.isEmpty()) { + segmentsRemaining = true; + String segmentName = queue.poll(); + // there might be a segment replica hosted on + // another instance added before + if (segmentsAdded.contains(segmentName)) { + continue; + } + currentBatch.add(segmentName); + segmentsAdded.add(segmentName); + if (currentBatch.size() == batchSize) { + segmentBatchList.add(currentBatch); + currentBatch = new HashSet<>(); + } + } + } + } + + if (!currentBatch.isEmpty()) { + segmentBatchList.add(currentBatch); + } + return segmentBatchList; + } + + @VisibleForTesting + Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState, + Set<String> targetConsumingSegments) { + Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>(); + + Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields(); + for (String segmentName : segmentNameToInstanceToStateMap.keySet()) { + if (!targetConsumingSegments.contains(segmentName)) { + continue; + } + Map<String, String> instanceToStateMap = segmentNameToInstanceToStateMap.get(segmentName); + for (String instance : instanceToStateMap.keySet()) { + String state = instanceToStateMap.get(instance); Review Comment: Use `entrySet()` to reduce lookup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org