npawar commented on a change in pull request #6740: URL: https://github.com/apache/incubator-pinot/pull/6740#discussion_r610161478
########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java ########## @@ -185,38 +177,107 @@ public static String buildSegment(SegmentGeneratorConfig segmentGeneratorConfig) } /** - * Uploads the segment tar files to the provided controller + * Uploads the segments from the provided segmentTar URIs to the table, using push details from the batchConfig + * @param tableNameWithType name of the table to upload the segment + * @param batchConfig batchConfig with details about push such as controllerURI, pushAttempts, pushParallelism, etc + * @param segmentTarURIs list of URI for the segment tar files + * @param authContext auth details required to upload the Pinot segment to controller */ - public static void uploadSegment(String tableNameWithType, List<File> tarFiles, URI controllerUri, - final String authToken) - throws RetriableOperationException, AttemptsExceededException { - for (File tarFile : tarFiles) { - String fileName = tarFile.getName(); - Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); - String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()); - - RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> { - try (InputStream inputStream = new FileInputStream(tarFile)) { - SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT - .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), segmentName, inputStream, - FileUploadDownloadClient.makeAuthHeader(authToken), - FileUploadDownloadClient.makeTableParam(tableNameWithType), - FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); - LOGGER.info("Response for pushing table {} segment {} - {}: {}", tableNameWithType, segmentName, - response.getStatusCode(), response.getResponse()); - return true; - } catch (HttpErrorStatusException e) { - int statusCode = e.getStatusCode(); - if (statusCode >= 500) { - LOGGER.warn("Caught temporary exception while pushing table: {} segment: {}, will retry", tableNameWithType, - segmentName, e); - return false; - } else { - throw e; + public static void uploadSegment(String tableNameWithType, BatchConfig batchConfig, List<URI> segmentTarURIs, + @Nullable AuthContext authContext) + throws Exception { + + SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext); + + String pushMode = batchConfig.getPushMode(); + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try { + SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS, + segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList())); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case URI: + try { + URI outputSegmentDirURI = null; + if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) { + outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); } + List<String> segmentUris = new ArrayList<>(); + for (URI segmentTarURI : segmentTarURIs) { + URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputSegmentDirURI, segmentTarURI, + segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(), + segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); + } + SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case METADATA: + try { + URI outputSegmentDirURI = null; + if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) { + outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); + } + PinotFS outputFileFS = getOutputPinotFS(batchConfig, outputSegmentDirURI); + Map<String, String> segmentUriToTarPathMap = SegmentPushUtils + .getSegmentUriToTarPathMap(outputSegmentDirURI, segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(), + segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix(), new String[]{segmentTarURIs.toString()}); + SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec, outputFileFS, segmentUriToTarPathMap); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); } - }); + break; + default: + throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode); + } + } + + private static SegmentGenerationJobSpec generateSegmentUploadSpec(String tableName, BatchConfig batchConfig, + @Nullable AuthContext authContext) { + + TableSpec tableSpec = new TableSpec(); + tableSpec.setTableName(tableName); + + PinotClusterSpec pinotClusterSpec = new PinotClusterSpec(); + pinotClusterSpec.setControllerURI(batchConfig.getPushControllerURI()); + PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec}; + + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setPushAttempts(batchConfig.getPushAttempts()); + pushJobSpec.setPushParallelism(batchConfig.getPushParallelism()); + pushJobSpec.setPushRetryIntervalMillis(batchConfig.getPushIntervalRetryMillis()); + pushJobSpec.setSegmentUriPrefix(batchConfig.getPushSegmentURIPrefix()); + pushJobSpec.setSegmentUriSuffix(batchConfig.getPushSegmentURISuffix()); + + SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec(); + spec.setPushJobSpec(pushJobSpec); + spec.setTableSpec(tableSpec); + spec.setPinotClusterSpecs(pinotClusterSpecs); + if (authContext != null && StringUtils.isNotBlank(authContext.getAuthToken())) { + spec.setAuthToken(authContext.getAuthToken()); + } + return spec; + } + + /** + * Creates an instance of the PinotFS using the fileURI and fs properties from BatchConfig + */ + public static PinotFS getOutputPinotFS(BatchConfig batchConfig, URI fileURI) { + String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme(); + if (fileURIScheme == null) { + fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME; + } + if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) { Review comment: separated it into method registerPinotFS -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org