apucher commented on a change in pull request #6740: URL: https://github.com/apache/incubator-pinot/pull/6740#discussion_r606376400
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java ########## @@ -63,18 +69,13 @@ private final TableConfig _tableConfig; private final Schema _schema; private final BatchConfig _batchConfig; - private final URI _controllerUri; private final File _uploadDir; - private final String _authToken; - public FileIngestionHelper(TableConfig tableConfig, Schema schema, BatchConfig batchConfig, URI controllerUri, - File uploadDir, String authToken) { + public FileIngestionHelper(TableConfig tableConfig, Schema schema, BatchConfig batchConfig, File uploadDir) { Review comment: noooo. my auth token :( it should be possible to provide auth info without writing it into the table config ########## File path: pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java ########## @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.segmentuploader; + +import com.google.common.base.Preconditions; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.core.util.IngestionUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.ingestion.batch.BatchConfig; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Default implementation of {@link SegmentUploader} with support for all push modes + * The configs for push are fetched from batchConfigMaps of tableConfig + */ +public class SegmentUploaderDefault implements SegmentUploader { + + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUploaderDefault.class); + + private String _tableNameWithType; + private BatchConfig _batchConfig; + + @Override + public void init(TableConfig tableConfig) Review comment: similar to the prev comment. imo either (a) pass an auth token here / add it as header/param to the actual upload() calls below or (b) add it as a transient property to table config that's injected by the service ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java ########## @@ -185,38 +176,102 @@ public static String buildSegment(SegmentGeneratorConfig segmentGeneratorConfig) } /** - * Uploads the segment tar files to the provided controller + * Uploads the segments from the provided segmentTar URIs to the table, using push details from the batchConfig + * @param tableNameWithType name of the table to upload the segment + * @param batchConfig batchConfig with details about push such as controllerURI, pushAttempts, pushParallelism, etc + * @param segmentTarURIs list of URI for the segment tar files */ - public static void uploadSegment(String tableNameWithType, List<File> tarFiles, URI controllerUri, - final String authToken) - throws RetriableOperationException, AttemptsExceededException { - for (File tarFile : tarFiles) { - String fileName = tarFile.getName(); - Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); - String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()); - - RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> { - try (InputStream inputStream = new FileInputStream(tarFile)) { - SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT - .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), segmentName, inputStream, - FileUploadDownloadClient.makeAuthHeader(authToken), - FileUploadDownloadClient.makeTableParam(tableNameWithType), - FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); - LOGGER.info("Response for pushing table {} segment {} - {}: {}", tableNameWithType, segmentName, - response.getStatusCode(), response.getResponse()); - return true; - } catch (HttpErrorStatusException e) { - int statusCode = e.getStatusCode(); - if (statusCode >= 500) { - LOGGER.warn("Caught temporary exception while pushing table: {} segment: {}, will retry", tableNameWithType, - segmentName, e); - return false; - } else { - throw e; + public static void uploadSegment(String tableNameWithType, BatchConfig batchConfig, List<URI> segmentTarURIs) Review comment: Why not pass a SegmentGenerationJobSpec here? Then, any additional metadata (like auth tokens) can be passed in by the executing service, without having to go into the table config alternatively, I guess one could put it in the tableconfig as a transient property and not upload it. not sure if this is a clean solution though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org