This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch basic-auth-controller
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 58d047ae0278ae710583f31d08ce17aff08ed069
Author: Alexander Pucher <a...@alexpucher.com>
AuthorDate: Fri Feb 19 12:04:48 2021 -0800

    refactor FileUploadDownloadClient
---
 .../apache/pinot/common/utils/CommonConstants.java |   6 ++
 .../common/utils/FileUploadDownloadClient.java     | 102 +++++----------------
 .../apache/pinot/core/common/MinionConstants.java  |   1 +
 .../core/data/manager/BaseTableDataManager.java    |   2 +
 .../manager/config/InstanceDataManagerConfig.java  |   2 +
 .../manager/config/TableDataManagerConfig.java     |  10 +-
 .../manager/realtime/SegmentCommitterFactory.java  |   3 +-
 .../realtime/Server2ControllerSegmentUploader.java |  10 +-
 .../ServerSegmentCompletionProtocolHandler.java    |  28 ++++--
 .../Server2ControllerSegmentUploaderTest.java      |   4 +-
 .../org/apache/pinot/minion/MinionContext.java     |   9 ++
 .../org/apache/pinot/minion/MinionStarter.java     |   3 +
 .../BaseMultipleSegmentsConversionExecutor.java    |   6 +-
 .../BaseSingleSegmentConversionExecutor.java       |   7 +-
 .../minion/taskfactory/TaskFactoryRegistry.java    |   4 +
 .../ingestion/batch/common/SegmentPushUtils.java   |  28 +++---
 .../ingestion/common/DefaultControllerRestApi.java |   4 +-
 .../starter/helix/HelixInstanceDataManager.java    |   3 +-
 .../helix/HelixInstanceDataManagerConfig.java      |   8 ++
 .../pinot/tools/backfill/BackfillSegmentUtils.java |   3 +
 .../apache/pinot/tools/utils/PinotConfigUtils.java |   3 +
 21 files changed, 130 insertions(+), 116 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 191ae93..ffa91a2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -235,6 +235,9 @@ public class CommonConstants {
         "pinot.server.instance.realtime.alloc.offheap.direct";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = 
"pinot.server.storage.factory";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = 
"pinot.server.crypter";
+
+    public static final String CONFIG_OF_AUTH_TOKEN = "auth.token";
+
     // Configuration to consider the server ServiceStatus as being STARTED if 
the percent of resources (tables) that
     // are ONLINE for this this server has crossed the threshold percentage of 
the total number of tables
     // that it is expected to serve.
@@ -304,6 +307,7 @@ public class CommonConstants {
       public static final String CONFIG_OF_CONTROLLER_HTTPS_ENABLED = 
"enabled";
       public static final String CONFIG_OF_CONTROLLER_HTTPS_PORT = 
"controller.port";
       public static final String CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = 
"upload.request.timeout.ms";
+      public static final String CONFIG_OF_SEGMENT_UPLOAD_AUTH_TOKEN = 
"upload.auth.token";
 
       public static final int DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = 
300_000;
       public static final int DEFAULT_OTHER_REQUESTS_TIMEOUT = 10_000;
@@ -353,6 +357,8 @@ public class CommonConstants {
     public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY = 
"segment.fetcher";
     public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = 
"segment.uploader";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "crypter";
+
+    public static final String CONFIG_OF_TASK_AUTH_TOKEN = "task.auth.token";
   }
 
   public static class Segment {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index b4da975..f7ff600 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -46,7 +46,6 @@ import org.apache.http.StatusLine;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.client.methods.RequestBuilder;
 import org.apache.http.entity.ContentType;
@@ -140,11 +139,13 @@ public class FileUploadDownloadClient implements 
Closeable {
     return new URI(protocol, null, host, port, path, null, null);
   }
 
+  @Deprecated
   public static URI getRetrieveTableConfigHttpURI(String host, int port, 
String rawTableName)
       throws URISyntaxException {
     return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
   }
 
+  @Deprecated
   public static URI getDeleteSegmentHttpUri(String host, int port, String 
rawTableName, String segmentName,
       String tableType)
       throws URISyntaxException {
@@ -152,6 +153,7 @@ public class FileUploadDownloadClient implements Closeable {
         rawTableName + "/" + URIUtils.encode(segmentName) + TYPE_DELIMITER + 
tableType));
   }
 
+  @Deprecated
   public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int 
port, String rawTableName,
       String tableType)
       throws URISyntaxException {
@@ -159,6 +161,7 @@ public class FileUploadDownloadClient implements Closeable {
         rawTableName + TYPE_DELIMITER + tableType));
   }
 
+  @Deprecated
   public static URI getRetrieveSchemaHttpURI(String host, int port, String 
schemaName)
       throws URISyntaxException {
     return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
@@ -169,16 +172,6 @@ public class FileUploadDownloadClient implements Closeable 
{
     return getURI(protocol, host, port, SCHEMA_PATH + "/" + schemaName);
   }
 
-  public static URI getUploadSchemaHttpURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTP, host, port, SCHEMA_PATH);
-  }
-
-  public static URI getUploadSchemaHttpsURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTPS, host, port, SCHEMA_PATH);
-  }
-
   public static URI getUploadSchemaURI(String protocol, String host, int port)
       throws URISyntaxException {
     return getURI(protocol, host, port, SCHEMA_PATH);
@@ -189,36 +182,12 @@ public class FileUploadDownloadClient implements 
Closeable {
     return getURI(controllerURI.getScheme(), controllerURI.getHost(), 
controllerURI.getPort(), SCHEMA_PATH);
   }
 
-  /**
-   * This method calls the old segment upload endpoint. We will deprecate this 
behavior soon. Please call
-   * getUploadSegmentHttpURI to construct your request.
-   */
-  @Deprecated
-  public static URI getOldUploadSegmentHttpURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTP, host, port, OLD_SEGMENT_PATH);
-  }
-
-  /**
-   * This method calls the old segment upload endpoint. We will deprecate this 
behavior soon. Please call
-   * getUploadSegmentHttpsURI to construct your request.
-   */
   @Deprecated
-  public static URI getOldUploadSegmentHttpsURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTPS, host, port, OLD_SEGMENT_PATH);
-  }
-
   public static URI getUploadSegmentHttpURI(String host, int port)
       throws URISyntaxException {
     return getURI(HTTP, host, port, SEGMENT_PATH);
   }
 
-  public static URI getUploadSegmentHttpsURI(String host, int port)
-      throws URISyntaxException {
-    return getURI(HTTPS, host, port, SEGMENT_PATH);
-  }
-
   public static URI getUploadSegmentURI(String protocol, String host, int port)
       throws URISyntaxException {
     return getURI(protocol, host, port, SEGMENT_PATH);
@@ -257,12 +226,14 @@ public class FileUploadDownloadClient implements 
Closeable {
     return requestBuilder.build();
   }
 
+  @Deprecated
   private static HttpUriRequest constructGetRequest(URI uri) {
     RequestBuilder requestBuilder = 
RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
     return requestBuilder.build();
   }
 
+  @Deprecated
   private static HttpUriRequest constructDeleteRequest(URI uri) {
     RequestBuilder requestBuilder = 
RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS);
@@ -275,11 +246,6 @@ public class FileUploadDownloadClient implements Closeable 
{
         DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
-  private static HttpUriRequest getUpdateSchemaRequest(URI uri, String 
schemaName, File schemaFile) {
-    return getUploadFileRequest(HttpPut.METHOD_NAME, uri, 
getContentBody(schemaName, schemaFile), null, null,
-        DEFAULT_SOCKET_TIMEOUT_MS);
-  }
-
   private static HttpUriRequest getUploadSegmentRequest(URI uri, String 
segmentName, File segmentFile,
       @Nullable List<Header> headers, @Nullable List<NameValuePair> 
parameters, int socketTimeoutMs) {
     return getUploadFileRequest(HttpPost.METHOD_NAME, uri, 
getContentBody(segmentName, segmentFile), headers,
@@ -299,7 +265,7 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   private static HttpUriRequest getUploadSegmentMetadataFilesRequest(URI uri, 
Map<String, File> metadataFiles,
-      int segmentUploadRequestTimeoutMs) {
+      @Nullable List<Header> headers, @Nullable List<NameValuePair> 
parameters, int segmentUploadRequestTimeoutMs) {
     MultipartEntityBuilder multipartEntityBuilder = 
MultipartEntityBuilder.create().
         setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
     for (Map.Entry<String, File> entry : metadataFiles.entrySet()) {
@@ -310,6 +276,7 @@ public class FileUploadDownloadClient implements Closeable {
     // Build the POST request.
     RequestBuilder requestBuilder =
         
RequestBuilder.create(HttpPost.METHOD_NAME).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+    addHeadersAndParameters(requestBuilder, headers, parameters);
     setTimeout(requestBuilder, segmentUploadRequestTimeoutMs);
     return requestBuilder.build();
   }
@@ -334,16 +301,13 @@ public class FileUploadDownloadClient implements 
Closeable {
     return requestBuilder.build();
   }
 
-  private static HttpUriRequest getSegmentCompletionProtocolRequest(URI uri, 
int socketTimeoutMs) {
+  private static HttpUriRequest getSegmentCompletionProtocolRequest(URI uri, 
@Nullable List<Header> headers,
+      @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
     RequestBuilder requestBuilder = 
RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, socketTimeoutMs);
     return requestBuilder.build();
   }
 
-  private static HttpUriRequest getDownloadFileRequest(URI uri, int 
socketTimeoutMs) {
-    return getDownloadFileRequest(uri, socketTimeoutMs, null);
-  }
-
   private static HttpUriRequest getDownloadFileRequest(URI uri, int 
socketTimeoutMs, String authToken) {
     RequestBuilder requestBuilder = 
RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     if (StringUtils.isNotBlank(authToken)) {
@@ -431,11 +395,13 @@ public class FileUploadDownloadClient implements 
Closeable {
     return errorMessage;
   }
 
+  @Deprecated
   public SimpleHttpResponse sendGetRequest(URI uri)
       throws IOException, HttpErrorStatusException {
     return sendRequest(constructGetRequest(uri));
   }
 
+  @Deprecated
   public SimpleHttpResponse sendDeleteRequest(URI uri)
       throws IOException, HttpErrorStatusException {
     return sendRequest(constructDeleteRequest(uri));
@@ -481,9 +447,10 @@ public class FileUploadDownloadClient implements Closeable 
{
 
   // Upload a set of segment metadata files (e.g., meta.properties and 
creation.meta) to controllers.
   public SimpleHttpResponse uploadSegmentMetadataFiles(URI uri, Map<String, 
File> metadataFiles,
-      int segmentUploadRequestTimeoutMs)
+      @Nullable List<Header> headers, @Nullable List<NameValuePair> 
parameters, int segmentUploadRequestTimeoutMs)
       throws IOException, HttpErrorStatusException {
-    return sendRequest(getUploadSegmentMetadataFilesRequest(uri, 
metadataFiles, segmentUploadRequestTimeoutMs));
+    return sendRequest(
+        getUploadSegmentMetadataFilesRequest(uri, metadataFiles, headers, 
parameters, segmentUploadRequestTimeoutMs));
   }
 
   /**
@@ -513,6 +480,8 @@ public class FileUploadDownloadClient implements Closeable {
   /**
    * Upload segment with segment file using default settings. Include table 
name as a request parameter.
    *
+   * NOTE: does not support auth tokens
+   *
    * @param uri URI
    * @param segmentName Segment name
    * @param segmentFile Segment file
@@ -521,6 +490,7 @@ public class FileUploadDownloadClient implements Closeable {
    * @throws IOException
    * @throws HttpErrorStatusException
    */
+  @Deprecated
   public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File 
segmentFile, String tableName)
       throws IOException, HttpErrorStatusException {
     // Add table name as a request parameter
@@ -580,6 +550,7 @@ public class FileUploadDownloadClient implements Closeable {
    * @throws IOException
    * @throws HttpErrorStatusException
    */
+  @Deprecated
   public SimpleHttpResponse sendSegmentUri(URI uri, String downloadUri, String 
rawTableName)
       throws IOException, HttpErrorStatusException {
     // Add table name as a request parameter
@@ -615,6 +586,7 @@ public class FileUploadDownloadClient implements Closeable {
    * @throws IOException
    * @throws HttpErrorStatusException
    */
+  @Deprecated
   public SimpleHttpResponse sendSegmentJson(URI uri, String jsonString)
       throws IOException, HttpErrorStatusException {
     return sendSegmentJson(uri, jsonString, null, null, 
DEFAULT_SOCKET_TIMEOUT_MS);
@@ -629,24 +601,10 @@ public class FileUploadDownloadClient implements 
Closeable {
    * @throws IOException
    * @throws HttpErrorStatusException
    */
-  public SimpleHttpResponse sendSegmentCompletionProtocolRequest(URI uri, int 
socketTimeoutMs)
-      throws IOException, HttpErrorStatusException {
-    return sendRequest(getSegmentCompletionProtocolRequest(uri, 
socketTimeoutMs));
-  }
-
-  /**
-   * Download a file using default settings.
-   *
-   * @param uri URI
-   * @param socketTimeoutMs Socket timeout in milliseconds
-   * @param dest File destination
-   * @return Response status code
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public int downloadFile(URI uri, int socketTimeoutMs, File dest)
+  public SimpleHttpResponse sendSegmentCompletionProtocolRequest(URI uri, 
@Nullable List<Header> headers,
+      @Nullable List<NameValuePair> parameters, int socketTimeoutMs)
       throws IOException, HttpErrorStatusException {
-    return downloadFile(uri, socketTimeoutMs, dest, null);
+    return sendRequest(getSegmentCompletionProtocolRequest(uri, headers, 
parameters, socketTimeoutMs));
   }
 
   /**
@@ -690,20 +648,6 @@ public class FileUploadDownloadClient implements Closeable 
{
   }
 
   /**
-   * Download a file.
-   *
-   * @param uri URI
-   * @param dest File destination
-   * @return Response status code
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public int downloadFile(URI uri, File dest)
-      throws IOException, HttpErrorStatusException {
-    return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest);
-  }
-
-  /**
    * Download a file, with an optional auth token.
    *
    * @param uri URI
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index d9d1a9d..c2ce030 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -29,6 +29,7 @@ public class MinionConstants {
   public static final String DOWNLOAD_URL_KEY = "downloadURL";
   public static final String UPLOAD_URL_KEY = "uploadURL";
   public static final String URL_SEPARATOR = ",";
+  public static final String AUTH_TOKEN = "authToken";
 
   /**
    * When minion downloads a segment to do work on, we will save that CRC. We 
will send that to the controller in an
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1381e61..a1c1fc3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -55,6 +55,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected File _indexDir;
   protected Logger _logger;
   protected HelixManager _helixManager;
+  protected String _authToken;
 
   @Override
   public void init(TableDataManagerConfig tableDataManagerConfig, String 
instanceId,
@@ -66,6 +67,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     _propertyStore = propertyStore;
     _serverMetrics = serverMetrics;
     _helixManager = helixManager;
+    _authToken = tableDataManagerConfig.getAuthToken();
 
     _tableNameWithType = tableDataManagerConfig.getTableName();
     _tableDataDir = tableDataManagerConfig.getDataDir();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
index 6422140..cf2aeb9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
@@ -50,4 +50,6 @@ public interface InstanceDataManagerConfig {
   boolean isDirectRealtimeOffHeapAllocation();
 
   int getMaxParallelSegmentBuilds();
+
+  String getAuthToken();
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
index ca0de52..577d795 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import javax.annotation.Nonnull;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -36,6 +37,7 @@ public class TableDataManagerConfig {
   private static final String TABLE_DATA_MANAGER_CONSUMER_DIRECTORY = 
"consumerDirectory";
   private static final String TABLE_DATA_MANAGER_NAME = "name";
   private static final String TABLE_IS_DIMENSION = "isDimTable";
+  private static final String TABLE_DATA_MANGER_AUTH_TOKEN = "authToken";
 
   private final Configuration _tableDataManagerConfig;
 
@@ -67,6 +69,10 @@ public class TableDataManagerConfig {
     return _tableDataManagerConfig.getBoolean(TABLE_IS_DIMENSION);
   }
 
+  public String getAuthToken() {
+    return _tableDataManagerConfig.getString(TABLE_DATA_MANGER_AUTH_TOKEN);
+  }
+
   public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig(
       @Nonnull InstanceDataManagerConfig instanceDataManagerConfig, @Nonnull 
String tableNameWithType) {
     Configuration defaultConfig = new PropertiesConfiguration();
@@ -77,14 +83,16 @@ public class TableDataManagerConfig {
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     Preconditions.checkNotNull(tableType);
     defaultConfig.addProperty(TABLE_DATA_MANAGER_TYPE, tableType.name());
+    defaultConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN, 
instanceDataManagerConfig.getAuthToken());
 
     return new TableDataManagerConfig(defaultConfig);
   }
 
-  public void overrideConfigs(@Nonnull TableConfig tableConfig) {
+  public void overrideConfigs(@Nonnull TableConfig tableConfig, String 
authToken) {
     // Override table level configs
 
     _tableDataManagerConfig.addProperty(TABLE_IS_DIMENSION, 
tableConfig.isDimTable());
+    _tableDataManagerConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN, 
authToken);
 
     // If we wish to override some table level configs using table config, 
override them here
     // Note: the configs in TableDataManagerConfig is immutable once the table 
is created, which mean it will not pick
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 2d8154e..9cb5704 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -63,7 +63,8 @@ public class SegmentCommitterFactory {
 
     segmentUploader = new Server2ControllerSegmentUploader(LOGGER, 
_protocolHandler.getFileUploadDownloadClient(),
         _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), 
params.getSegmentName(),
-        
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
_serverMetrics);
+        
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
_serverMetrics,
+        _protocolHandler.getAuthToken());
     return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, 
segmentUploader);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
index 35084aa..ac46ee8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
@@ -39,10 +39,11 @@ public class Server2ControllerSegmentUploader implements 
SegmentUploader {
   private final String _segmentName;
   private final int _segmentUploadRequestTimeoutMs;
   private final ServerMetrics _serverMetrics;
+  private final String _authToken;
 
   public Server2ControllerSegmentUploader(Logger segmentLogger, 
FileUploadDownloadClient fileUploadDownloadClient,
       String controllerSegmentUploadCommitUrl, String segmentName, int 
segmentUploadRequestTimeoutMs,
-      ServerMetrics serverMetrics)
+      ServerMetrics serverMetrics, String authToken)
       throws URISyntaxException {
     _segmentLogger = segmentLogger;
     _fileUploadDownloadClient = fileUploadDownloadClient;
@@ -50,10 +51,11 @@ public class Server2ControllerSegmentUploader implements 
SegmentUploader {
     _segmentName = segmentName;
     _segmentUploadRequestTimeoutMs = segmentUploadRequestTimeoutMs;
     _serverMetrics = serverMetrics;
+    _authToken = authToken;
   }
 
   @Override
-  public URI uploadSegment(File segmentFile,  LLCSegmentName segmentName) {
+  public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
     SegmentCompletionProtocol.Response response = 
uploadSegmentToController(segmentFile);
     if (response.getStatus() == 
SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) {
       try {
@@ -69,8 +71,8 @@ public class Server2ControllerSegmentUploader implements 
SegmentUploader {
     SegmentCompletionProtocol.Response response;
     try {
       String responseStr = _fileUploadDownloadClient
-          .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, 
segmentFile, null, null,
-              _segmentUploadRequestTimeoutMs).getResponse();
+          .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, 
segmentFile,
+              FileUploadDownloadClient.makeAuthHeader(_authToken), null, 
_segmentUploadRequestTimeoutMs).getResponse();
       response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       _segmentLogger.info("Controller response {} for {}", 
response.toJsonString(), _controllerSegmentUploadCommitUrl);
       if 
(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER))
 {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index c419105..5f24486 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -44,6 +44,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol.*;
+
 
 /**
  * A class that handles sending segment completion protocol requests to the 
controller and getting
@@ -58,6 +60,7 @@ public class ServerSegmentCompletionProtocolHandler {
   private static SSLContext _sslContext;
   private static Integer _controllerHttpsPort;
   private static int _segmentUploadRequestTimeoutMs;
+  private static String _authToken;
 
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final ServerMetrics _serverMetrics;
@@ -69,8 +72,9 @@ public class ServerSegmentCompletionProtocolHandler {
       _sslContext = new 
ClientSSLContextGenerator(httpsConfig.subset(CommonConstants.PREFIX_OF_SSL_SUBSET)).generate();
       _controllerHttpsPort = 
httpsConfig.getProperty(CONFIG_OF_CONTROLLER_HTTPS_PORT, Integer.class);
     }
-    _segmentUploadRequestTimeoutMs =
-        
uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, 
DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
+    _segmentUploadRequestTimeoutMs = uploaderConfig
+        .getProperty(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, 
DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
+    _authToken = 
uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOAD_AUTH_TOKEN);
   }
 
   public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, 
String tableNameWithType) {
@@ -87,6 +91,10 @@ public class ServerSegmentCompletionProtocolHandler {
     return _fileUploadDownloadClient;
   }
 
+  public String getAuthToken() {
+    return _authToken;
+  }
+
   public SegmentCompletionProtocol.Response 
segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
     SegmentCompletionProtocol.SegmentCommitStartRequest request =
         new SegmentCompletionProtocol.SegmentCommitStartRequest(params);
@@ -146,10 +154,11 @@ public class ServerSegmentCompletionProtocolHandler {
       return SegmentCompletionProtocol.RESP_NOT_SENT;
     }
 
-    Server2ControllerSegmentUploader segmentUploader= null;
+    Server2ControllerSegmentUploader segmentUploader = null;
     try {
-      segmentUploader = new Server2ControllerSegmentUploader(LOGGER,
-          _fileUploadDownloadClient, url, params.getSegmentName(), 
_segmentUploadRequestTimeoutMs, _serverMetrics);
+      segmentUploader =
+          new Server2ControllerSegmentUploader(LOGGER, 
_fileUploadDownloadClient, url, params.getSegmentName(),
+              _segmentUploadRequestTimeoutMs, _serverMetrics, _authToken);
     } catch (URISyntaxException e) {
       LOGGER.error("Segment commit upload url error: ", e);
       return SegmentCompletionProtocol.RESP_NOT_SENT;
@@ -203,9 +212,9 @@ public class ServerSegmentCompletionProtocolHandler {
   private SegmentCompletionProtocol.Response sendRequest(String url) {
     SegmentCompletionProtocol.Response response;
     try {
-      String responseStr =
-          _fileUploadDownloadClient.sendSegmentCompletionProtocolRequest(new 
URI(url), DEFAULT_OTHER_REQUESTS_TIMEOUT)
-              .getResponse();
+      String responseStr = _fileUploadDownloadClient
+          .sendSegmentCompletionProtocolRequest(new URI(url), 
FileUploadDownloadClient.makeAuthHeader(_authToken), null,
+              DEFAULT_OTHER_REQUESTS_TIMEOUT).getResponse();
       response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       LOGGER.info("Controller response {} for {}", response.toJsonString(), 
url);
       if 
(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER))
 {
@@ -228,7 +237,8 @@ public class ServerSegmentCompletionProtocolHandler {
     SegmentCompletionProtocol.Response response;
     try {
       String responseStr = _fileUploadDownloadClient
-          .uploadSegmentMetadataFiles(new URI(url), metadataFiles, 
_segmentUploadRequestTimeoutMs).getResponse();
+          .uploadSegmentMetadataFiles(new URI(url), metadataFiles, 
FileUploadDownloadClient.makeAuthHeader(_authToken),
+              null, _segmentUploadRequestTimeoutMs).getResponse();
       response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       LOGGER.info("Controller response {} for {}", response.toJsonString(), 
url);
       if 
(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER))
 {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
index 85f84f1..86a7088 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
@@ -91,7 +91,7 @@ public class Server2ControllerSegmentUploaderTest {
       throws URISyntaxException {
     Server2ControllerSegmentUploader uploader =
         new Server2ControllerSegmentUploader(_logger, 
_fileUploadDownloadClient, GOOD_CONTROLLER_VIP, "segmentName",
-            10000, mock(ServerMetrics.class));
+            10000, mock(ServerMetrics.class), null);
     URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertEquals(segmentURI.toString(), SEGMENT_LOCATION);
   }
@@ -101,7 +101,7 @@ public class Server2ControllerSegmentUploaderTest {
       throws URISyntaxException {
     Server2ControllerSegmentUploader uploader =
         new Server2ControllerSegmentUploader(_logger, 
_fileUploadDownloadClient, BAD_CONTROLLER_VIP, "segmentName",
-            10000, mock(ServerMetrics.class));
+            10000, mock(ServerMetrics.class), null);
     URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertNull(segmentURI);
   }
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java 
b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
index 35e58fa..6ba1d67 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
@@ -45,6 +45,7 @@ public class MinionContext {
 
   // For segment upload
   private SSLContext _sslContext;
+  private String _taskAuthToken;
 
   // For PurgeTask
   private SegmentPurger.RecordPurgerFactory _recordPurgerFactory;
@@ -97,4 +98,12 @@ public class MinionContext {
   public void setRecordModifierFactory(SegmentPurger.RecordModifierFactory 
recordModifierFactory) {
     _recordModifierFactory = recordModifierFactory;
   }
+
+  public String getTaskAuthToken() {
+    return _taskAuthToken;
+  }
+
+  public void setTaskAuthToken(String taskAuthToken) {
+    _taskAuthToken = taskAuthToken;
+  }
 }
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java 
b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index 015b8e2..37aa51d 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -160,6 +160,9 @@ public class MinionStarter implements ServiceStartable {
     minionMetrics.initializeGlobalMeters();
     minionContext.setMinionMetrics(minionMetrics);
 
+    // initialize authentication
+    
minionContext.setTaskAuthToken(_config.getProperty(CommonConstants.Minion.CONFIG_OF_TASK_AUTH_TOKEN));
+
     // Start all components
     LOGGER.info("Initializing PinotFSFactory");
     PinotConfiguration pinotFSConfig = 
_config.subset(CommonConstants.Minion.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
index ee7be9e..f9e7ebe 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
@@ -88,6 +88,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor 
extends BaseTaskExe
     String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String[] downloadURLs = 
downloadURLString.split(MinionConstants.URL_SEPARATOR);
     String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
+    String authToken = configs.get(MinionConstants.AUTH_TOKEN);
 
     LOGGER.info("Start executing {} on table: {}, input segments: {} with 
downloadURLs: {}, uploadURL: {}", taskType,
         tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
@@ -149,8 +150,9 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
             TableNameBuilder.extractRawTableName(tableNameWithType));
         List<NameValuePair> parameters = 
Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter);
 
-        SegmentConversionUtils.uploadSegment(configs, null, parameters, 
tableNameWithType, resultSegmentName, uploadURL,
-            convertedTarredSegmentFile);
+        SegmentConversionUtils
+            .uploadSegment(configs, 
FileUploadDownloadClient.makeAuthHeader(authToken), parameters, 
tableNameWithType,
+                resultSegmentName, uploadURL, convertedTarredSegmentFile);
       }
 
       String outputSegmentNames = 
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java
index 0b94a23..4f9d779 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.minion.executor;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -72,6 +73,7 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
     String downloadURL = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
     String originalSegmentCrc = 
configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
+    String authToken = configs.get(MinionConstants.AUTH_TOKEN);
 
     LOGGER.info("Start executing {} on table: {}, segment: {} with 
downloadURL: {}, uploadURL: {}", taskType,
         tableNameWithType, segmentName, downloadURL, uploadURL);
@@ -121,7 +123,10 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
           new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
               segmentZKMetadataCustomMapModifier.toJsonString());
 
-      List<Header> httpHeaders = Arrays.asList(ifMatchHeader, 
segmentZKMetadataCustomMapModifierHeader);
+      List<Header> httpHeaders = new ArrayList<>();
+      httpHeaders.add(ifMatchHeader);
+      httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+      httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
 
       // Set parameters for upload request.
       NameValuePair enableParallelPushProtectionParameter =
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
index ce5c840..b8adc80 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
@@ -25,6 +25,7 @@ import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
+import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.minion.event.EventObserverFactoryRegistry;
@@ -71,6 +72,9 @@ public class TaskFactoryRegistry {
               MinionMetrics minionMetrics = 
MinionContext.getInstance().getMinionMetrics();
 
               PinotTaskConfig pinotTaskConfig = 
PinotTaskConfig.fromHelixTaskConfig(_taskConfig);
+              pinotTaskConfig.getConfigs()
+                  .put(MinionConstants.AUTH_TOKEN, 
MinionContext.getInstance().getTaskAuthToken());
+
               _eventObserver.notifyTaskStart(pinotTaskConfig);
               minionMetrics.addMeteredTableValue(taskType, 
MinionMeter.NUMBER_TASKS_EXECUTED, 1L);
               LOGGER.info("Start running {}: {} with configs: {}", 
pinotTaskConfig.getTaskType(), _taskConfig.getId(),
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index 5cc69ba..43efce6 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -19,12 +19,12 @@
 package org.apache.pinot.plugin.ingestion.batch.common;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -33,9 +33,7 @@ import java.util.UUID;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
-import org.apache.http.NameValuePair;
 import org.apache.http.message.BasicHeader;
-import org.apache.http.message.BasicNameValuePair;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
@@ -177,7 +175,10 @@ public class SegmentPushUtils implements Serializable {
         RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
           try {
             SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-                
.sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), 
segmentUri, tableName);
+                
.sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), 
segmentUri,
+                    
FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()),
+                    FileUploadDownloadClient.makeTableParam(tableName),
+                    FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response for pushing table {} segment uri {} to 
location {} - {}: {}", tableName, segmentUri,
                 controllerURI, response.getStatusCode(), 
response.getResponse());
             return true;
@@ -248,17 +249,18 @@ public class SegmentPushUtils implements Serializable {
           }
           RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
             try {
-              List<Header> headers = ImmutableList
-                  .of(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, 
segmentUriPath),
-                      new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
-                          
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
-              // Add table name as a request parameter
-              NameValuePair tableNameValuePair =
-                  new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName);
-              List<NameValuePair> parameters = 
Arrays.asList(tableNameValuePair);
+              List<Header> headers = new ArrayList<>();
+              headers.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, 
segmentUriPath));
+              headers.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+                  
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+              if (StringUtils.isNotBlank(spec.getAuthToken())) {
+                
headers.addAll(FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()));
+              }
+
               SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
                   
.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
 segmentName,
-                      segmentMetadataFile, headers, parameters, 
FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+                      segmentMetadataFile, headers, 
FileUploadDownloadClient.makeTableParam(tableName),
+                      FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
               LOGGER.info("Response for pushing table {} segment {} to 
location {} - {}: {}", tableName, segmentName,
                   controllerURI, response.getStatusCode(), 
response.getResponse());
               return true;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
index 391d33b..43db07d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java
@@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -118,8 +117,7 @@ public class DefaultControllerRestApi implements 
ControllerRestApi {
           try (InputStream inputStream = fileSystem.open(tarFilePath)) {
             SimpleHttpResponse response = 
_fileUploadDownloadClient.uploadSegment(
                 
FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), 
pushLocation.getPort()),
-                segmentName, inputStream, Collections.emptyList(),
-                FileUploadDownloadClient.makeTableParam(_rawTableName),
+                segmentName, inputStream, null, 
FileUploadDownloadClient.makeTableParam(_rawTableName),
                 FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
             LOGGER.info("Response {}: {}", response.getStatusCode(), 
response.getResponse());
             break;
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 162ba71..7cfc19b 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -82,6 +82,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     _instanceId = _instanceDataManagerConfig.getInstanceId();
     _helixManager = helixManager;
     _serverMetrics = serverMetrics;
+    _authToken = 
config.getProperty(CommonConstants.Server.CONFIG_OF_AUTH_TOKEN);
 
     File instanceDataDir = new 
File(_instanceDataManagerConfig.getInstanceDataDir());
     if (!instanceDataDir.exists()) {
@@ -138,7 +139,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     LOGGER.info("Creating table data manager for table: {}", 
tableNameWithType);
     TableDataManagerConfig tableDataManagerConfig =
         
TableDataManagerConfig.getDefaultHelixTableDataManagerConfig(_instanceDataManagerConfig,
 tableNameWithType);
-    tableDataManagerConfig.overrideConfigs(tableConfig);
+    tableDataManagerConfig.overrideConfigs(tableConfig, _authToken);
     TableDataManager tableDataManager = TableDataManagerProvider
         .getTableDataManager(tableDataManagerConfig, _instanceId, 
_propertyStore, _serverMetrics, _helixManager);
     tableDataManager.start();
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 41c2744..0e4f60f 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -21,6 +21,7 @@ package org.apache.pinot.server.starter.helix;
 import java.util.Optional;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.http.auth.AUTH;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
@@ -60,6 +61,8 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   public static final String SEGMENT_FORMAT_VERSION = "segment.format.version";
   // Key of whether to enable reloading consuming segments
   public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT = 
"reload.consumingSegment";
+  // Key of the auth token
+  public static final String AUTH_TOKEN = "auth.token";
 
   // Key of how many parallel realtime segments can be built.
   // A value of <= 0 indicates unlimited.
@@ -195,6 +198,11 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   }
 
   @Override
+  public String getAuthToken() {
+    return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN);
+  }
+
+  @Override
   public String toString() {
     String configString = "";
     configString += "Instance Id: " + getInstanceId();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java
index 7d230ea..254a7be 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java
@@ -152,7 +152,10 @@ public class BackfillSegmentUtils {
 
   /**
    * Uploads the segment tar to the controller.
+   *
+   * NOTE: this method does not support auth tokens
    */
+  @Deprecated
   public boolean uploadSegment(String rawTableName, String segmentName, File 
segmentDir, File outputDir) {
     boolean success = true;
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 17338af..047faca 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -178,6 +178,8 @@ public class PinotConfigUtils {
     properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, 
serverDataDir);
     properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, 
serverSegmentDir);
     properties.put("pinot.server.segment.fetcher.auth.token", "Basic 
YWRtaW46dmVyeXNlY3JldA==");
+    properties.put("pinot.server.segment.upload.auth.token", "Basic 
YWRtaW46dmVyeXNlY3JldA==");
+    properties.put("pinot.server.instance.auth.token", "Basic 
YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }
@@ -191,6 +193,7 @@ public class PinotConfigUtils {
     properties.put(CommonConstants.Helix.KEY_OF_MINION_HOST, minionHost);
     properties.put(CommonConstants.Helix.KEY_OF_MINION_PORT, minionPort != 0 ? 
minionPort : getAvailablePort());
     properties.put("segment.fetcher.auth.token", "Basic 
YWRtaW46dmVyeXNlY3JldA==");
+    properties.put("task.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA==");
 
     return properties;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to