npawar commented on a change in pull request #6740:
URL: https://github.com/apache/incubator-pinot/pull/6740#discussion_r610161478



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -185,38 +177,107 @@ public static String buildSegment(SegmentGeneratorConfig 
segmentGeneratorConfig)
   }
 
   /**
-   * Uploads the segment tar files to the provided controller
+   * Uploads the segments from the provided segmentTar URIs to the table, 
using push details from the batchConfig
+   * @param tableNameWithType name of the table to upload the segment
+   * @param batchConfig batchConfig with details about push such as 
controllerURI, pushAttempts, pushParallelism, etc
+   * @param segmentTarURIs list of URI for the segment tar files
+   * @param authContext auth details required to upload the Pinot segment to 
controller
    */
-  public static void uploadSegment(String tableNameWithType, List<File> 
tarFiles, URI controllerUri,
-      final String authToken)
-      throws RetriableOperationException, AttemptsExceededException {
-    for (File tarFile : tarFiles) {
-      String fileName = tarFile.getName();
-      
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
-      String segmentName = fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length());
-
-      RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, 
DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> {
-        try (InputStream inputStream = new FileInputStream(tarFile)) {
-          SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-              
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), 
segmentName, inputStream,
-                  FileUploadDownloadClient.makeAuthHeader(authToken),
-                  FileUploadDownloadClient.makeTableParam(tableNameWithType),
-                  FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
-          LOGGER.info("Response for pushing table {} segment {} - {}: {}", 
tableNameWithType, segmentName,
-              response.getStatusCode(), response.getResponse());
-          return true;
-        } catch (HttpErrorStatusException e) {
-          int statusCode = e.getStatusCode();
-          if (statusCode >= 500) {
-            LOGGER.warn("Caught temporary exception while pushing table: {} 
segment: {}, will retry", tableNameWithType,
-                segmentName, e);
-            return false;
-          } else {
-            throw e;
+  public static void uploadSegment(String tableNameWithType, BatchConfig 
batchConfig, List<URI> segmentTarURIs,
+      @Nullable AuthContext authContext)
+      throws Exception {
+
+    SegmentGenerationJobSpec segmentUploadSpec = 
generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext);
+
+    String pushMode = batchConfig.getPushMode();
+    switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try {
+          SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS,
+              
segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList()));
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
+        }
+        break;
+      case URI:
+        try {
+          URI outputSegmentDirURI = null;
+          if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) {
+            outputSegmentDirURI = 
URI.create(batchConfig.getOutputSegmentDirURI());
           }
+          List<String> segmentUris = new ArrayList<>();
+          for (URI segmentTarURI : segmentTarURIs) {
+            URI updatedURI = 
SegmentPushUtils.generateSegmentTarURI(outputSegmentDirURI, segmentTarURI,
+                segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(),
+                segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix());
+            segmentUris.add(updatedURI.toString());
+          }
+          SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris);
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
+        }
+        break;
+      case METADATA:
+        try {
+          URI outputSegmentDirURI = null;
+          if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) {
+            outputSegmentDirURI = 
URI.create(batchConfig.getOutputSegmentDirURI());
+          }
+          PinotFS outputFileFS = getOutputPinotFS(batchConfig, 
outputSegmentDirURI);
+          Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+              .getSegmentUriToTarPathMap(outputSegmentDirURI, 
segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(),
+                  segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix(), 
new String[]{segmentTarURIs.toString()});
+          SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec, 
outputFileFS, segmentUriToTarPathMap);
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
         }
-      });
+        break;
+      default:
+        throw new UnsupportedOperationException("Unrecognized push mode - " + 
pushMode);
+    }
+  }
+
+  private static SegmentGenerationJobSpec generateSegmentUploadSpec(String 
tableName, BatchConfig batchConfig,
+      @Nullable AuthContext authContext) {
+
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(tableName);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(batchConfig.getPushControllerURI());
+    PinotClusterSpec[] pinotClusterSpecs = new 
PinotClusterSpec[]{pinotClusterSpec};
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(batchConfig.getPushAttempts());
+    pushJobSpec.setPushParallelism(batchConfig.getPushParallelism());
+    
pushJobSpec.setPushRetryIntervalMillis(batchConfig.getPushIntervalRetryMillis());
+    pushJobSpec.setSegmentUriPrefix(batchConfig.getPushSegmentURIPrefix());
+    pushJobSpec.setSegmentUriSuffix(batchConfig.getPushSegmentURISuffix());
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    spec.setPushJobSpec(pushJobSpec);
+    spec.setTableSpec(tableSpec);
+    spec.setPinotClusterSpecs(pinotClusterSpecs);
+    if (authContext != null && 
StringUtils.isNotBlank(authContext.getAuthToken())) {
+      spec.setAuthToken(authContext.getAuthToken());
+    }
+    return spec;
+  }
+
+  /**
+   * Creates an instance of the PinotFS using the fileURI and fs properties 
from BatchConfig
+   */
+  public static PinotFS getOutputPinotFS(BatchConfig batchConfig, URI fileURI) 
{
+    String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
+    if (fileURIScheme == null) {
+      fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+    }
+    if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) {

Review comment:
       separated it into method registerPinotFS




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to