snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1039916803


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +300,84 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, 
URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, 
taskConfigs, pushJobSpec);
+
+    switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+          SegmentPushUtils.pushSegments(

Review Comment:
   This will break backward compatibilty. We should use 
`SegmentConversionUtils.uploadSegment()` when `TAR push` is being used.
   
   `SegmentConversionUtils.uploadSegment()` takes some configuration from task 
config and we do use those.
   
   ```
       String maxNumAttemptsConfigStr = 
configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
       int maxNumAttemptsFromConfig =
           maxNumAttemptsConfigStr != null ? 
Integer.parseInt(maxNumAttemptsConfigStr) : DEFAULT_MAX_NUM_ATTEMPTS;
       int maxNumAttempts = Math.max(maxNumAttemptsFromConfig, 
uriProvider.numAddresses());
       LOGGER.info("Retry uploading for {} times. Max num attempts from pinot 
minion config: {}, number of IP addresses "
           + "for upload URI: {}", maxNumAttempts, maxNumAttemptsFromConfig, 
uriProvider.numAddresses());
       String initialRetryDelayMsConfig = 
configs.get(MinionConstants.INITIAL_RETRY_DELAY_MS_KEY);
       long initialRetryDelayMs =
           initialRetryDelayMsConfig != null ? 
Long.parseLong(initialRetryDelayMsConfig) : DEFAULT_INITIAL_RETRY_DELAY_MS;
       String retryScaleFactorConfig = 
configs.get(MinionConstants.RETRY_SCALE_FACTOR_KEY);
       double retryScaleFactor =
           retryScaleFactorConfig != null ? 
Double.parseDouble(retryScaleFactorConfig) : DEFAULT_RETRY_SCALE_FACTOR;
       RetryPolicy retryPolicy =
           RetryPolicies.exponentialBackoffRetryPolicy(maxNumAttempts, 
initialRetryDelayMs, retryScaleFactor);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to