apucher commented on a change in pull request #6740:
URL: https://github.com/apache/incubator-pinot/pull/6740#discussion_r606376400



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
##########
@@ -63,18 +69,13 @@
   private final TableConfig _tableConfig;
   private final Schema _schema;
   private final BatchConfig _batchConfig;
-  private final URI _controllerUri;
   private final File _uploadDir;
-  private final String _authToken;
 
-  public FileIngestionHelper(TableConfig tableConfig, Schema schema, 
BatchConfig batchConfig, URI controllerUri,
-      File uploadDir, String authToken) {
+  public FileIngestionHelper(TableConfig tableConfig, Schema schema, 
BatchConfig batchConfig, File uploadDir) {

Review comment:
       noooo. my auth token :(
   
   it should be possible to provide auth info without writing it into the table 
config

##########
File path: 
pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
##########
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.segmentuploader;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.core.util.IngestionUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Default implementation of {@link SegmentUploader} with support for all push 
modes
+ * The configs for push are fetched from batchConfigMaps of tableConfig
+ */
+public class SegmentUploaderDefault implements SegmentUploader {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentUploaderDefault.class);
+
+  private String _tableNameWithType;
+  private BatchConfig _batchConfig;
+
+  @Override
+  public void init(TableConfig tableConfig)

Review comment:
       similar to the prev comment. imo either (a) pass an auth token here / 
add it as header/param to the actual upload() calls below or (b) add it as a 
transient property to table config that's injected by the service

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -185,38 +176,102 @@ public static String buildSegment(SegmentGeneratorConfig 
segmentGeneratorConfig)
   }
 
   /**
-   * Uploads the segment tar files to the provided controller
+   * Uploads the segments from the provided segmentTar URIs to the table, 
using push details from the batchConfig
+   * @param tableNameWithType name of the table to upload the segment
+   * @param batchConfig batchConfig with details about push such as 
controllerURI, pushAttempts, pushParallelism, etc
+   * @param segmentTarURIs list of URI for the segment tar files
    */
-  public static void uploadSegment(String tableNameWithType, List<File> 
tarFiles, URI controllerUri,
-      final String authToken)
-      throws RetriableOperationException, AttemptsExceededException {
-    for (File tarFile : tarFiles) {
-      String fileName = tarFile.getName();
-      
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
-      String segmentName = fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length());
-
-      RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS, 
DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> {
-        try (InputStream inputStream = new FileInputStream(tarFile)) {
-          SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-              
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri), 
segmentName, inputStream,
-                  FileUploadDownloadClient.makeAuthHeader(authToken),
-                  FileUploadDownloadClient.makeTableParam(tableNameWithType),
-                  FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
-          LOGGER.info("Response for pushing table {} segment {} - {}: {}", 
tableNameWithType, segmentName,
-              response.getStatusCode(), response.getResponse());
-          return true;
-        } catch (HttpErrorStatusException e) {
-          int statusCode = e.getStatusCode();
-          if (statusCode >= 500) {
-            LOGGER.warn("Caught temporary exception while pushing table: {} 
segment: {}, will retry", tableNameWithType,
-                segmentName, e);
-            return false;
-          } else {
-            throw e;
+  public static void uploadSegment(String tableNameWithType, BatchConfig 
batchConfig, List<URI> segmentTarURIs)

Review comment:
       Why not pass a SegmentGenerationJobSpec here? Then, any additional 
metadata (like auth tokens) can be passed in by the executing service, without 
having to go into the table config
   
   alternatively, I guess one could put it in the tableconfig as a transient 
property and not upload it. not sure if this is a clean solution though




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to