npawar commented on a change in pull request #6740:
URL: https://github.com/apache/incubator-pinot/pull/6740#discussion_r610161239



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -185,38 +177,107 @@ public static String buildSegment(SegmentGeneratorConfig 
segmentGeneratorConfig)
   }
 
   /**
-   * Uploads the segment tar files to the provided controller
+   * Uploads the segments from the provided segmentTar URIs to the table, 
using push details from the batchConfig
+   * @param tableNameWithType name of the table to upload the segment
+   * @param batchConfig batchConfig with details about push such as 
controllerURI, pushAttempts, pushParallelism, etc
+   * @param segmentTarURIs list of URI for the segment tar files
+   * @param authContext auth details required to upload the Pinot segment to 
controller
    */
-  public static void uploadSegment(String tableNameWithType, List<File> 
tarFiles, URI controllerUri,
-      final String authToken)
-      throws RetriableOperationException, AttemptsExceededException {
-    for (File tarFile : tarFiles) {
-      String fileName = tarFile.getName();
-      
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
-      String segmentName = fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length());
-
-      RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, 
DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> {
-        try (InputStream inputStream = new FileInputStream(tarFile)) {
-          SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-              
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), 
segmentName, inputStream,
-                  FileUploadDownloadClient.makeAuthHeader(authToken),
-                  FileUploadDownloadClient.makeTableParam(tableNameWithType),
-                  FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
-          LOGGER.info("Response for pushing table {} segment {} - {}: {}", 
tableNameWithType, segmentName,
-              response.getStatusCode(), response.getResponse());
-          return true;
-        } catch (HttpErrorStatusException e) {
-          int statusCode = e.getStatusCode();
-          if (statusCode >= 500) {
-            LOGGER.warn("Caught temporary exception while pushing table: {} 
segment: {}, will retry", tableNameWithType,
-                segmentName, e);
-            return false;
-          } else {
-            throw e;
+  public static void uploadSegment(String tableNameWithType, BatchConfig 
batchConfig, List<URI> segmentTarURIs,
+      @Nullable AuthContext authContext)
+      throws Exception {
+
+    SegmentGenerationJobSpec segmentUploadSpec = 
generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext);
+
+    String pushMode = batchConfig.getPushMode();
+    switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try {
+          SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS,
+              
segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList()));
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);

Review comment:
       added for each case




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to