Jackie-Jiang commented on code in PR #9286:
URL: https://github.com/apache/pinot/pull/9286#discussion_r956647000


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java:
##########
@@ -198,7 +198,10 @@ protected IntermediateResultsBlock mergeResults()
     while (numBlocksMerged < _numOperators) {
       IntermediateResultsBlock blockToMerge =
           _blockingQueue.poll(endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
-      if (blockToMerge == null) {
+      // Timeout has reached, shouldn't continue to process. 
`_blockingQueue.poll` will continue to return blocks even

Review Comment:
   (minor) Good catch. We may save an extra time read by checking if the wait 
time (`endTimeMs - System.currentTimeMillis()`) is positive before reading the 
block



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java:
##########
@@ -197,6 +205,13 @@ protected void processSegments(int taskIndex) {
     }
   }
 
+  // Check for thread interruption, every time after merging 10_000 keys
+  private void checkMergePhaseInterruption(long mergedKeys) {
+    if (mergedKeys % DocIdSetPlanNode.MAX_DOC_PER_CALL == 0 && 
Thread.interrupted()) {

Review Comment:
   (minor) Let's use a separate constant for this



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java:
##########
@@ -165,6 +167,8 @@ protected void processSegments(int taskIndex) {
         // Merge aggregation group-by result.
         // Iterate over the group-by keys, for each key, update the group-by 
result in the indexedTable
         Collection<IntermediateRecord> intermediateRecords = 
resultsBlock.getIntermediateRecords();
+        // Count the number of merged keys
+        long mergedKeys = 0;

Review Comment:
   (minor) This can be `int`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -142,7 +142,7 @@ public void init(TableDataManagerConfig 
tableDataManagerConfig, String instanceI
     _isStreamSegmentDownloadUntar = 
tableDataManagerParams.isStreamSegmentDownloadUntar();
     if (_isStreamSegmentDownloadUntar) {
       LOGGER.info("Using streamed download-untar for segment download! "
-              + "The rate limit interval for streamed download-untar is {} ms",
+              + "The rate limit interval for streamed download-untar is {} 
bytes",

Review Comment:
   This is still confusing. IIUC, this should be `The rate limit for streamed 
download-untar is {} bytes/s"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to