snleee commented on code in PR #9825: URL: https://github.com/apache/pinot/pull/9825#discussion_r1040096535
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -276,6 +300,89 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI, + List<Header> headers, List<NameValuePair> parameters, SegmentConversionResult segmentConversionResult) + throws Exception { + String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE); + LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS); + pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM); + pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS); + pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX)); + pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX)); + + SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec); + + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try { + File tarFile = new File(outputSegmentTarURI); + String segmentName = segmentConversionResult.getSegmentName(); + String tableNameWithType = segmentConversionResult.getTableNameWithType(); + String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); + SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName, + uploadURL, tarFile); + } catch (RetriableOperationException | AttemptsExceededException e) { Review Comment: Why do we need to catch specific exceptions and wrap it with `RuntimeException` here? Our top caller function's definition: `executeTask() throw Exception` ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -276,6 +300,89 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI, + List<Header> headers, List<NameValuePair> parameters, SegmentConversionResult segmentConversionResult) + throws Exception { + String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE); + LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS); + pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM); + pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS); + pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX)); + pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX)); + + SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec); + + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try { + File tarFile = new File(outputSegmentTarURI); + String segmentName = segmentConversionResult.getSegmentName(); + String tableNameWithType = segmentConversionResult.getTableNameWithType(); + String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); + SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName, + uploadURL, tarFile); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case METADATA: + if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) { + URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); + try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) { + Map<String, String> segmentUriToTarPathMap = + SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec, + new String[]{outputSegmentTarURI.toString()}); + SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters); + } catch (RetriableOperationException | AttemptsExceededException e) { Review Comment: same here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org