zhtaoxiang commented on code in PR #9825: URL: https://github.com/apache/pinot/pull/9825#discussion_r1035630266
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java: ########## @@ -97,6 +97,49 @@ public static URI generateSegmentTarURI(URI dirURI, URI fileURI, String prefix, public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths) throws RetriableOperationException, AttemptsExceededException { String tableName = spec.getTableSpec().getTableName(); + AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken()); Review Comment: just curious: why do we need to create those 3 new methods? It seems that there is no logic change? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -276,6 +299,98 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI, + List<Header> headers, List<NameValuePair> parameters) throws Exception { + String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE); + LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS); + pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM); + pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS); + pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX)); + pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX)); + + SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec); + + URI outputSegmentDirURI = null; + if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) { + outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); + } + try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) { + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) { + SegmentPushUtils.pushSegments( + spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case URI: Review Comment: I feel that we may not allow URI mode, it does not make too much sense to use URI mode in minion tasks. WDYT? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -82,6 +84,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator { private static final String DEFAULT_BUCKET_PERIOD = "1d"; private static final String DEFAULT_BUFFER_PERIOD = "2d"; + private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE = Review Comment: It seems that this variable is not used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org