sajjad-moradi commented on code in PR #14811: URL: https://github.com/apache/pinot/pull/14811#discussion_r1935978585
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,126 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, ForceCommitBatchConfig forceCommitBatchConfig) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = + getSegmentBatchList(idealState, targetConsumingSegments, forceCommitBatchConfig.getBatchSize()); + + _forceCommitExecutorService.submit( + () -> processBatchesSequentially(segmentBatchList, tableNameWithType, forceCommitBatchConfig)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType, + ForceCommitBatchConfig forceCommitBatchConfig) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit : segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch, forceCommitBatchConfig); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit, + ForceCommitBatchConfig forceCommitBatchConfig) { + int batchStatusCheckIntervalMs = forceCommitBatchConfig.getBatchStatusCheckIntervalMs(); + int batchStatusCheckTimeoutMs = forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(); + + try { + Thread.sleep(batchStatusCheckIntervalMs); + } catch (InterruptedException e) { + LOGGER.error("Exception occurred while waiting for the forceCommit of segments: {}", segmentBatchToCommit, e); + throw new RuntimeException(e); + } + + int maxAttempts = (batchStatusCheckTimeoutMs + batchStatusCheckIntervalMs - 1) / batchStatusCheckIntervalMs; + RetryPolicy retryPolicy = + RetryPolicies.fixedDelayRetryPolicy(maxAttempts, batchStatusCheckIntervalMs); + int attemptCount = 0; + final Set<String>[] segmentsYetToBeCommitted = new Set[1]; + + try { + attemptCount = retryPolicy.attempt(() -> { + segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); + return segmentsYetToBeCommitted[0].isEmpty(); + }); + } catch (AttemptsExceededException | RetriableOperationException e) { + String errorMsg = String.format( + "Exception occurred while waiting for the forceCommit of segments: %s, attempt count: %d, " + + "segmentsYetToBeCommitted: %s", + segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]); + LOGGER.error(errorMsg, e); + throw new RuntimeException(e); Review Comment: If an exception is thrown, there's no need to log. Add the errorMsg to the runtime exception. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1848,15 +1860,126 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, ForceCommitBatchConfig forceCommitBatchConfig) { IdealState idealState = getIdealState(tableNameWithType); Set<String> allConsumingSegments = findConsumingSegments(idealState); Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + + List<Set<String>> segmentBatchList = + getSegmentBatchList(idealState, targetConsumingSegments, forceCommitBatchConfig.getBatchSize()); + + _forceCommitExecutorService.submit( + () -> processBatchesSequentially(segmentBatchList, tableNameWithType, forceCommitBatchConfig)); + return targetConsumingSegments; } + private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType, + ForceCommitBatchConfig forceCommitBatchConfig) { + Set<String> prevBatch = null; + for (Set<String> segmentBatchToCommit : segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch, forceCommitBatchConfig); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit, + ForceCommitBatchConfig forceCommitBatchConfig) { + int batchStatusCheckIntervalMs = forceCommitBatchConfig.getBatchStatusCheckIntervalMs(); + int batchStatusCheckTimeoutMs = forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(); + + try { + Thread.sleep(batchStatusCheckIntervalMs); + } catch (InterruptedException e) { + LOGGER.error("Exception occurred while waiting for the forceCommit of segments: {}", segmentBatchToCommit, e); + throw new RuntimeException(e); + } + + int maxAttempts = (batchStatusCheckTimeoutMs + batchStatusCheckIntervalMs - 1) / batchStatusCheckIntervalMs; + RetryPolicy retryPolicy = + RetryPolicies.fixedDelayRetryPolicy(maxAttempts, batchStatusCheckIntervalMs); + int attemptCount = 0; Review Comment: No need for this variable. Both AttemptsExceededException and RetriableOperationException have getAttempts method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org