yuanbenson commented on code in PR #9295: URL: https://github.com/apache/pinot/pull/9295#discussion_r964301549
########## pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java: ########## @@ -47,55 +36,8 @@ public SegmentUriPushJobRunner(SegmentGenerationJobSpec spec) { init(spec); } - @Override - public void init(SegmentGenerationJobSpec spec) { - _spec = spec; - if (_spec.getPushJobSpec() == null) { - throw new RuntimeException("Missing PushJobSpec"); - } - } - - @Override - public void run() { - //init all file systems - List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs(); - for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { - PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); - } - - //Get outputFS for writing output Pinot segments - URI outputDirURI; - try { - outputDirURI = new URI(_spec.getOutputDirURI()); - if (outputDirURI.getScheme() == null) { - outputDirURI = new File(_spec.getOutputDirURI()).toURI(); - } - } catch (URISyntaxException e) { - throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'"); - } - PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme()); - - //Get list of files to process - String[] files; - try { - files = outputDirFS.listFiles(outputDirURI, true); - } catch (IOException e) { - throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'"); - } - List<String> segmentUris = new ArrayList<>(); - for (String file : files) { - URI uri = URI.create(file); - if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { - URI updatedURI = SegmentPushUtils - .generateSegmentTarURI(outputDirURI, uri, _spec.getPushJobSpec().getSegmentUriPrefix(), - _spec.getPushJobSpec().getSegmentUriSuffix()); - segmentUris.add(updatedURI.toString()); - } - } - try { - SegmentPushUtils.sendSegmentUris(_spec, segmentUris); - } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(e); - } + public void uploadSegments(Map<String, String> segmentsUriToTarPathMap) + throws AttemptsExceededException, RetriableOperationException { + SegmentPushUtils.sendSegmentUris(_spec, new ArrayList<>(segmentsUriToTarPathMap.keySet())); Review Comment: same here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org