zhtaoxiang commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1035630266


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -97,6 +97,49 @@ public static URI generateSegmentTarURI(URI dirURI, URI 
fileURI, String prefix,
   public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS 
fileSystem, List<String> tarFilePaths)
       throws RetriableOperationException, AttemptsExceededException {
     String tableName = spec.getTableSpec().getTableName();
+    AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());

Review Comment:
   just curious: why do we need to create those 3 new methods? It seems that 
there is no logic change?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,98 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, 
URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, 
taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;
+    if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) 
{
+      outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    }
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, 
outputSegmentDirURI)) {
+      switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+        case TAR:
+          try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+            SegmentPushUtils.pushSegments(
+                spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), 
headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case URI:

Review Comment:
   I feel that we may not allow URI mode, it does not make too much sense to 
use URI mode in minion tasks. WDYT?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -82,6 +84,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends 
BaseTaskGenerator {
 
   private static final String DEFAULT_BUCKET_PERIOD = "1d";
   private static final String DEFAULT_BUFFER_PERIOD = "2d";
+  private static final BatchConfigProperties.SegmentPushType 
DEFAULT_SEGMENT_PUSH_TYPE =

Review Comment:
   It seems that this variable is not used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to