yuanbenson commented on code in PR #9295:
URL: https://github.com/apache/pinot/pull/9295#discussion_r964301549


##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java:
##########
@@ -47,55 +36,8 @@ public SegmentUriPushJobRunner(SegmentGenerationJobSpec 
spec) {
     init(spec);
   }
 
-  @Override
-  public void init(SegmentGenerationJobSpec spec) {
-    _spec = spec;
-    if (_spec.getPushJobSpec() == null) {
-      throw new RuntimeException("Missing PushJobSpec");
-    }
-  }
-
-  @Override
-  public void run() {
-    //init all file systems
-    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
-    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-    }
-
-    //Get outputFS for writing output Pinot segments
-    URI outputDirURI;
-    try {
-      outputDirURI = new URI(_spec.getOutputDirURI());
-      if (outputDirURI.getScheme() == null) {
-        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-      }
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("outputDirURI is not valid - '" + 
_spec.getOutputDirURI() + "'");
-    }
-    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
-
-    //Get list of files to process
-    String[] files;
-    try {
-      files = outputDirFS.listFiles(outputDirURI, true);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
-    }
-    List<String> segmentUris = new ArrayList<>();
-    for (String file : files) {
-      URI uri = URI.create(file);
-      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        URI updatedURI = SegmentPushUtils
-            .generateSegmentTarURI(outputDirURI, uri, 
_spec.getPushJobSpec().getSegmentUriPrefix(),
-                _spec.getPushJobSpec().getSegmentUriSuffix());
-        segmentUris.add(updatedURI.toString());
-      }
-    }
-    try {
-      SegmentPushUtils.sendSegmentUris(_spec, segmentUris);
-    } catch (RetriableOperationException | AttemptsExceededException e) {
-      throw new RuntimeException(e);
-    }
+  public void uploadSegments(Map<String, String> segmentsUriToTarPathMap)
+      throws AttemptsExceededException, RetriableOperationException {
+    SegmentPushUtils.sendSegmentUris(_spec, new 
ArrayList<>(segmentsUriToTarPathMap.keySet()));

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to