snleee commented on code in PR #9825: URL: https://github.com/apache/pinot/pull/9825#discussion_r1039916803
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -276,6 +300,84 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI, + List<Header> headers, List<NameValuePair> parameters) throws Exception { + String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE); + LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS); + pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM); + pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS); + pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX)); + pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX)); + + SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec); + + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) { + SegmentPushUtils.pushSegments( Review Comment: This will break backward compatibilty. We should use `SegmentConversionUtils.uploadSegment()` when `TAR push` is being used. `SegmentConversionUtils.uploadSegment()` takes some configuration from task config and we do use those. ``` String maxNumAttemptsConfigStr = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY); int maxNumAttemptsFromConfig = maxNumAttemptsConfigStr != null ? Integer.parseInt(maxNumAttemptsConfigStr) : DEFAULT_MAX_NUM_ATTEMPTS; int maxNumAttempts = Math.max(maxNumAttemptsFromConfig, uriProvider.numAddresses()); LOGGER.info("Retry uploading for {} times. Max num attempts from pinot minion config: {}, number of IP addresses " + "for upload URI: {}", maxNumAttempts, maxNumAttemptsFromConfig, uriProvider.numAddresses()); String initialRetryDelayMsConfig = configs.get(MinionConstants.INITIAL_RETRY_DELAY_MS_KEY); long initialRetryDelayMs = initialRetryDelayMsConfig != null ? Long.parseLong(initialRetryDelayMsConfig) : DEFAULT_INITIAL_RETRY_DELAY_MS; String retryScaleFactorConfig = configs.get(MinionConstants.RETRY_SCALE_FACTOR_KEY); double retryScaleFactor = retryScaleFactorConfig != null ? Double.parseDouble(retryScaleFactorConfig) : DEFAULT_RETRY_SCALE_FACTOR; RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(maxNumAttempts, initialRetryDelayMs, retryScaleFactor); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org