Jackie-Jiang commented on code in PR #8812:
URL: https://github.com/apache/pinot/pull/8812#discussion_r888226564


##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java:
##########
@@ -222,6 +244,10 @@ public void run()
         }
       }
       _segmentCreationTaskCountDownLatch.await();
+
+      if (_failure.get() != null) {

Review Comment:
   We may do `_executorService.shutdownNow()` here. See another comment for 
details



##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java:
##########
@@ -194,6 +199,23 @@ public void run()
         filteredFiles.add(file);
       }
     }
+
+    // Sort files based on creation time.
+    Collections.sort(filteredFiles, new Comparator<String>() {

Review Comment:
   Suggest not addressing this TODO in this PR. It can potentially make lots of 
calls (O(nlogn)) to the PinotFS and cause issues when it is remote. Recently we 
have encountered issues of hitting the request rate limit for S3.



##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java:
##########
@@ -281,7 +317,11 @@ private void submitSegmentGenTask(File localTempDir, URI 
inputFileURI, int seqId
           _outputDirFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
         }
       } catch (Exception e) {
-        LOGGER.error("Failed to generate Pinot segment for file - {}", 
inputFileURI, e);
+        String msg = "Failed to generate Pinot segment for file - " + 
inputFileURI.toString();
+        _failure.compareAndSet(null, new RuntimeException(msg, e));
+
+        // We have to decrement the latch by the number of pending tasks.

Review Comment:
   Suggest shutting down the executor service in the main thread after all the 
tasks are submitted. It can throw exception if we shut it down here but still 
trying to submit another task.
   We can drain the latch here by checking the remaining count:
   ```
           long count = _segmentCreationTaskCountDownLatch.getCount();
           for (int i = 0; i < count; i++) {
             _segmentCreationTaskCountDownLatch.countDown();
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to