snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1040096535


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +300,89 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, 
URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters, 
SegmentConversionResult segmentConversionResult)
+      throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, 
taskConfigs, pushJobSpec);
+
+    switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try {
+          File tarFile = new File(outputSegmentTarURI);
+          String segmentName = segmentConversionResult.getSegmentName();
+          String tableNameWithType = 
segmentConversionResult.getTableNameWithType();
+          String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
+          SegmentConversionUtils.uploadSegment(taskConfigs, headers, 
parameters, tableNameWithType, segmentName,
+              uploadURL, tarFile);
+        } catch (RetriableOperationException | AttemptsExceededException e) {

Review Comment:
   Why do we need to catch specific exceptions and wrap it with 
`RuntimeException` here? Our top caller function's definition: `executeTask() 
throw Exception`



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +300,89 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, 
URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters, 
SegmentConversionResult segmentConversionResult)
+      throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, 
taskConfigs, pushJobSpec);
+
+    switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try {
+          File tarFile = new File(outputSegmentTarURI);
+          String segmentName = segmentConversionResult.getSegmentName();
+          String tableNameWithType = 
segmentConversionResult.getTableNameWithType();
+          String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
+          SegmentConversionUtils.uploadSegment(taskConfigs, headers, 
parameters, tableNameWithType, segmentName,
+              uploadURL, tarFile);
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
+        }
+        break;
+      case METADATA:
+        if 
(taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+          URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+          try (PinotFS outputFileFS = 
MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+            Map<String, String> segmentUriToTarPathMap =
+                
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+                    new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, 
segmentUriToTarPathMap, headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {

Review Comment:
   same here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to