noob-se7en commented on code in PR #14811:
URL: https://github.com/apache/pinot/pull/14811#discussion_r1920652568
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1856,114 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ try {
+ for (Set<String> segmentBatchToCommit : segmentBatchList) {
+ executorService.submit(() -> executeBatch(tableNameWithType,
segmentBatchToCommit));
+ }
+ } finally {
+ executorService.shutdown();
+ }
+
return targetConsumingSegments;
}
+ private void executeBatch(String tableNameWithType, Set<String>
segmentBatchToCommit) {
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+
+ try {
+ Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
Review Comment:
This sleep is to save zookeeper calls, as at this point of code it's almost
certain that all segments are yet to be committed.
(we need a new retryPolicy with initial start delay)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]