This is an automated email from the ASF dual-hosted git repository. apucher pushed a commit to branch basic-auth-controller in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 58d047ae0278ae710583f31d08ce17aff08ed069 Author: Alexander Pucher <a...@alexpucher.com> AuthorDate: Fri Feb 19 12:04:48 2021 -0800 refactor FileUploadDownloadClient --- .../apache/pinot/common/utils/CommonConstants.java | 6 ++ .../common/utils/FileUploadDownloadClient.java | 102 +++++---------------- .../apache/pinot/core/common/MinionConstants.java | 1 + .../core/data/manager/BaseTableDataManager.java | 2 + .../manager/config/InstanceDataManagerConfig.java | 2 + .../manager/config/TableDataManagerConfig.java | 10 +- .../manager/realtime/SegmentCommitterFactory.java | 3 +- .../realtime/Server2ControllerSegmentUploader.java | 10 +- .../ServerSegmentCompletionProtocolHandler.java | 28 ++++-- .../Server2ControllerSegmentUploaderTest.java | 4 +- .../org/apache/pinot/minion/MinionContext.java | 9 ++ .../org/apache/pinot/minion/MinionStarter.java | 3 + .../BaseMultipleSegmentsConversionExecutor.java | 6 +- .../BaseSingleSegmentConversionExecutor.java | 7 +- .../minion/taskfactory/TaskFactoryRegistry.java | 4 + .../ingestion/batch/common/SegmentPushUtils.java | 28 +++--- .../ingestion/common/DefaultControllerRestApi.java | 4 +- .../starter/helix/HelixInstanceDataManager.java | 3 +- .../helix/HelixInstanceDataManagerConfig.java | 8 ++ .../pinot/tools/backfill/BackfillSegmentUtils.java | 3 + .../apache/pinot/tools/utils/PinotConfigUtils.java | 3 + 21 files changed, 130 insertions(+), 116 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 191ae93..ffa91a2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -235,6 +235,9 @@ public class CommonConstants { "pinot.server.instance.realtime.alloc.offheap.direct"; public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = "pinot.server.storage.factory"; public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "pinot.server.crypter"; + + public static final String CONFIG_OF_AUTH_TOKEN = "auth.token"; + // Configuration to consider the server ServiceStatus as being STARTED if the percent of resources (tables) that // are ONLINE for this this server has crossed the threshold percentage of the total number of tables // that it is expected to serve. @@ -304,6 +307,7 @@ public class CommonConstants { public static final String CONFIG_OF_CONTROLLER_HTTPS_ENABLED = "enabled"; public static final String CONFIG_OF_CONTROLLER_HTTPS_PORT = "controller.port"; public static final String CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = "upload.request.timeout.ms"; + public static final String CONFIG_OF_SEGMENT_UPLOAD_AUTH_TOKEN = "upload.auth.token"; public static final int DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = 300_000; public static final int DEFAULT_OTHER_REQUESTS_TIMEOUT = 10_000; @@ -353,6 +357,8 @@ public class CommonConstants { public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY = "segment.fetcher"; public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = "segment.uploader"; public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "crypter"; + + public static final String CONFIG_OF_TASK_AUTH_TOKEN = "task.auth.token"; } public static class Segment { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index b4da975..f7ff600 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -46,7 +46,6 @@ import org.apache.http.StatusLine; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.RequestBuilder; import org.apache.http.entity.ContentType; @@ -140,11 +139,13 @@ public class FileUploadDownloadClient implements Closeable { return new URI(protocol, null, host, port, path, null, null); } + @Deprecated public static URI getRetrieveTableConfigHttpURI(String host, int port, String rawTableName) throws URISyntaxException { return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName); } + @Deprecated public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName, String tableType) throws URISyntaxException { @@ -152,6 +153,7 @@ public class FileUploadDownloadClient implements Closeable { rawTableName + "/" + URIUtils.encode(segmentName) + TYPE_DELIMITER + tableType)); } + @Deprecated public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException { @@ -159,6 +161,7 @@ public class FileUploadDownloadClient implements Closeable { rawTableName + TYPE_DELIMITER + tableType)); } + @Deprecated public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName) throws URISyntaxException { return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName); @@ -169,16 +172,6 @@ public class FileUploadDownloadClient implements Closeable { return getURI(protocol, host, port, SCHEMA_PATH + "/" + schemaName); } - public static URI getUploadSchemaHttpURI(String host, int port) - throws URISyntaxException { - return getURI(HTTP, host, port, SCHEMA_PATH); - } - - public static URI getUploadSchemaHttpsURI(String host, int port) - throws URISyntaxException { - return getURI(HTTPS, host, port, SCHEMA_PATH); - } - public static URI getUploadSchemaURI(String protocol, String host, int port) throws URISyntaxException { return getURI(protocol, host, port, SCHEMA_PATH); @@ -189,36 +182,12 @@ public class FileUploadDownloadClient implements Closeable { return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SCHEMA_PATH); } - /** - * This method calls the old segment upload endpoint. We will deprecate this behavior soon. Please call - * getUploadSegmentHttpURI to construct your request. - */ - @Deprecated - public static URI getOldUploadSegmentHttpURI(String host, int port) - throws URISyntaxException { - return getURI(HTTP, host, port, OLD_SEGMENT_PATH); - } - - /** - * This method calls the old segment upload endpoint. We will deprecate this behavior soon. Please call - * getUploadSegmentHttpsURI to construct your request. - */ @Deprecated - public static URI getOldUploadSegmentHttpsURI(String host, int port) - throws URISyntaxException { - return getURI(HTTPS, host, port, OLD_SEGMENT_PATH); - } - public static URI getUploadSegmentHttpURI(String host, int port) throws URISyntaxException { return getURI(HTTP, host, port, SEGMENT_PATH); } - public static URI getUploadSegmentHttpsURI(String host, int port) - throws URISyntaxException { - return getURI(HTTPS, host, port, SEGMENT_PATH); - } - public static URI getUploadSegmentURI(String protocol, String host, int port) throws URISyntaxException { return getURI(protocol, host, port, SEGMENT_PATH); @@ -257,12 +226,14 @@ public class FileUploadDownloadClient implements Closeable { return requestBuilder.build(); } + @Deprecated private static HttpUriRequest constructGetRequest(URI uri) { RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1); setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS); return requestBuilder.build(); } + @Deprecated private static HttpUriRequest constructDeleteRequest(URI uri) { RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1); setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS); @@ -275,11 +246,6 @@ public class FileUploadDownloadClient implements Closeable { DEFAULT_SOCKET_TIMEOUT_MS); } - private static HttpUriRequest getUpdateSchemaRequest(URI uri, String schemaName, File schemaFile) { - return getUploadFileRequest(HttpPut.METHOD_NAME, uri, getContentBody(schemaName, schemaFile), null, null, - DEFAULT_SOCKET_TIMEOUT_MS); - } - private static HttpUriRequest getUploadSegmentRequest(URI uri, String segmentName, File segmentFile, @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) { return getUploadFileRequest(HttpPost.METHOD_NAME, uri, getContentBody(segmentName, segmentFile), headers, @@ -299,7 +265,7 @@ public class FileUploadDownloadClient implements Closeable { } private static HttpUriRequest getUploadSegmentMetadataFilesRequest(URI uri, Map<String, File> metadataFiles, - int segmentUploadRequestTimeoutMs) { + @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int segmentUploadRequestTimeoutMs) { MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create(). setMode(HttpMultipartMode.BROWSER_COMPATIBLE); for (Map.Entry<String, File> entry : metadataFiles.entrySet()) { @@ -310,6 +276,7 @@ public class FileUploadDownloadClient implements Closeable { // Build the POST request. RequestBuilder requestBuilder = RequestBuilder.create(HttpPost.METHOD_NAME).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity); + addHeadersAndParameters(requestBuilder, headers, parameters); setTimeout(requestBuilder, segmentUploadRequestTimeoutMs); return requestBuilder.build(); } @@ -334,16 +301,13 @@ public class FileUploadDownloadClient implements Closeable { return requestBuilder.build(); } - private static HttpUriRequest getSegmentCompletionProtocolRequest(URI uri, int socketTimeoutMs) { + private static HttpUriRequest getSegmentCompletionProtocolRequest(URI uri, @Nullable List<Header> headers, + @Nullable List<NameValuePair> parameters, int socketTimeoutMs) { RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1); setTimeout(requestBuilder, socketTimeoutMs); return requestBuilder.build(); } - private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs) { - return getDownloadFileRequest(uri, socketTimeoutMs, null); - } - private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken) { RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1); if (StringUtils.isNotBlank(authToken)) { @@ -431,11 +395,13 @@ public class FileUploadDownloadClient implements Closeable { return errorMessage; } + @Deprecated public SimpleHttpResponse sendGetRequest(URI uri) throws IOException, HttpErrorStatusException { return sendRequest(constructGetRequest(uri)); } + @Deprecated public SimpleHttpResponse sendDeleteRequest(URI uri) throws IOException, HttpErrorStatusException { return sendRequest(constructDeleteRequest(uri)); @@ -481,9 +447,10 @@ public class FileUploadDownloadClient implements Closeable { // Upload a set of segment metadata files (e.g., meta.properties and creation.meta) to controllers. public SimpleHttpResponse uploadSegmentMetadataFiles(URI uri, Map<String, File> metadataFiles, - int segmentUploadRequestTimeoutMs) + @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int segmentUploadRequestTimeoutMs) throws IOException, HttpErrorStatusException { - return sendRequest(getUploadSegmentMetadataFilesRequest(uri, metadataFiles, segmentUploadRequestTimeoutMs)); + return sendRequest( + getUploadSegmentMetadataFilesRequest(uri, metadataFiles, headers, parameters, segmentUploadRequestTimeoutMs)); } /** @@ -513,6 +480,8 @@ public class FileUploadDownloadClient implements Closeable { /** * Upload segment with segment file using default settings. Include table name as a request parameter. * + * NOTE: does not support auth tokens + * * @param uri URI * @param segmentName Segment name * @param segmentFile Segment file @@ -521,6 +490,7 @@ public class FileUploadDownloadClient implements Closeable { * @throws IOException * @throws HttpErrorStatusException */ + @Deprecated public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile, String tableName) throws IOException, HttpErrorStatusException { // Add table name as a request parameter @@ -580,6 +550,7 @@ public class FileUploadDownloadClient implements Closeable { * @throws IOException * @throws HttpErrorStatusException */ + @Deprecated public SimpleHttpResponse sendSegmentUri(URI uri, String downloadUri, String rawTableName) throws IOException, HttpErrorStatusException { // Add table name as a request parameter @@ -615,6 +586,7 @@ public class FileUploadDownloadClient implements Closeable { * @throws IOException * @throws HttpErrorStatusException */ + @Deprecated public SimpleHttpResponse sendSegmentJson(URI uri, String jsonString) throws IOException, HttpErrorStatusException { return sendSegmentJson(uri, jsonString, null, null, DEFAULT_SOCKET_TIMEOUT_MS); @@ -629,24 +601,10 @@ public class FileUploadDownloadClient implements Closeable { * @throws IOException * @throws HttpErrorStatusException */ - public SimpleHttpResponse sendSegmentCompletionProtocolRequest(URI uri, int socketTimeoutMs) - throws IOException, HttpErrorStatusException { - return sendRequest(getSegmentCompletionProtocolRequest(uri, socketTimeoutMs)); - } - - /** - * Download a file using default settings. - * - * @param uri URI - * @param socketTimeoutMs Socket timeout in milliseconds - * @param dest File destination - * @return Response status code - * @throws IOException - * @throws HttpErrorStatusException - */ - public int downloadFile(URI uri, int socketTimeoutMs, File dest) + public SimpleHttpResponse sendSegmentCompletionProtocolRequest(URI uri, @Nullable List<Header> headers, + @Nullable List<NameValuePair> parameters, int socketTimeoutMs) throws IOException, HttpErrorStatusException { - return downloadFile(uri, socketTimeoutMs, dest, null); + return sendRequest(getSegmentCompletionProtocolRequest(uri, headers, parameters, socketTimeoutMs)); } /** @@ -690,20 +648,6 @@ public class FileUploadDownloadClient implements Closeable { } /** - * Download a file. - * - * @param uri URI - * @param dest File destination - * @return Response status code - * @throws IOException - * @throws HttpErrorStatusException - */ - public int downloadFile(URI uri, File dest) - throws IOException, HttpErrorStatusException { - return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest); - } - - /** * Download a file, with an optional auth token. * * @param uri URI diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index d9d1a9d..c2ce030 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -29,6 +29,7 @@ public class MinionConstants { public static final String DOWNLOAD_URL_KEY = "downloadURL"; public static final String UPLOAD_URL_KEY = "uploadURL"; public static final String URL_SEPARATOR = ","; + public static final String AUTH_TOKEN = "authToken"; /** * When minion downloads a segment to do work on, we will save that CRC. We will send that to the controller in an diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 1381e61..a1c1fc3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -55,6 +55,7 @@ public abstract class BaseTableDataManager implements TableDataManager { protected File _indexDir; protected Logger _logger; protected HelixManager _helixManager; + protected String _authToken; @Override public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId, @@ -66,6 +67,7 @@ public abstract class BaseTableDataManager implements TableDataManager { _propertyStore = propertyStore; _serverMetrics = serverMetrics; _helixManager = helixManager; + _authToken = tableDataManagerConfig.getAuthToken(); _tableNameWithType = tableDataManagerConfig.getTableName(); _tableDataDir = tableDataManagerConfig.getDataDir(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java index 6422140..cf2aeb9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java @@ -50,4 +50,6 @@ public interface InstanceDataManagerConfig { boolean isDirectRealtimeOffHeapAllocation(); int getMaxParallelSegmentBuilds(); + + String getAuthToken(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java index ca0de52..577d795 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import javax.annotation.Nonnull; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -36,6 +37,7 @@ public class TableDataManagerConfig { private static final String TABLE_DATA_MANAGER_CONSUMER_DIRECTORY = "consumerDirectory"; private static final String TABLE_DATA_MANAGER_NAME = "name"; private static final String TABLE_IS_DIMENSION = "isDimTable"; + private static final String TABLE_DATA_MANGER_AUTH_TOKEN = "authToken"; private final Configuration _tableDataManagerConfig; @@ -67,6 +69,10 @@ public class TableDataManagerConfig { return _tableDataManagerConfig.getBoolean(TABLE_IS_DIMENSION); } + public String getAuthToken() { + return _tableDataManagerConfig.getString(TABLE_DATA_MANGER_AUTH_TOKEN); + } + public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig( @Nonnull InstanceDataManagerConfig instanceDataManagerConfig, @Nonnull String tableNameWithType) { Configuration defaultConfig = new PropertiesConfiguration(); @@ -77,14 +83,16 @@ public class TableDataManagerConfig { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); Preconditions.checkNotNull(tableType); defaultConfig.addProperty(TABLE_DATA_MANAGER_TYPE, tableType.name()); + defaultConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN, instanceDataManagerConfig.getAuthToken()); return new TableDataManagerConfig(defaultConfig); } - public void overrideConfigs(@Nonnull TableConfig tableConfig) { + public void overrideConfigs(@Nonnull TableConfig tableConfig, String authToken) { // Override table level configs _tableDataManagerConfig.addProperty(TABLE_IS_DIMENSION, tableConfig.isDimTable()); + _tableDataManagerConfig.addProperty(TABLE_DATA_MANGER_AUTH_TOKEN, authToken); // If we wish to override some table level configs using table config, override them here // Note: the configs in TableDataManagerConfig is immutable once the table is created, which mean it will not pick diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index 2d8154e..9cb5704 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -63,7 +63,8 @@ public class SegmentCommitterFactory { segmentUploader = new Server2ControllerSegmentUploader(LOGGER, _protocolHandler.getFileUploadDownloadClient(), _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), params.getSegmentName(), - ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics); + ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics, + _protocolHandler.getAuthToken()); return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java index 35084aa..ac46ee8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java @@ -39,10 +39,11 @@ public class Server2ControllerSegmentUploader implements SegmentUploader { private final String _segmentName; private final int _segmentUploadRequestTimeoutMs; private final ServerMetrics _serverMetrics; + private final String _authToken; public Server2ControllerSegmentUploader(Logger segmentLogger, FileUploadDownloadClient fileUploadDownloadClient, String controllerSegmentUploadCommitUrl, String segmentName, int segmentUploadRequestTimeoutMs, - ServerMetrics serverMetrics) + ServerMetrics serverMetrics, String authToken) throws URISyntaxException { _segmentLogger = segmentLogger; _fileUploadDownloadClient = fileUploadDownloadClient; @@ -50,10 +51,11 @@ public class Server2ControllerSegmentUploader implements SegmentUploader { _segmentName = segmentName; _segmentUploadRequestTimeoutMs = segmentUploadRequestTimeoutMs; _serverMetrics = serverMetrics; + _authToken = authToken; } @Override - public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) { + public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) { SegmentCompletionProtocol.Response response = uploadSegmentToController(segmentFile); if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) { try { @@ -69,8 +71,8 @@ public class Server2ControllerSegmentUploader implements SegmentUploader { SegmentCompletionProtocol.Response response; try { String responseStr = _fileUploadDownloadClient - .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, segmentFile, null, null, - _segmentUploadRequestTimeoutMs).getResponse(); + .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, segmentFile, + FileUploadDownloadClient.makeAuthHeader(_authToken), null, _segmentUploadRequestTimeoutMs).getResponse(); response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); _segmentLogger.info("Controller response {} for {}", response.toJsonString(), _controllerSegmentUploadCommitUrl); if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) { diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java index c419105..5f24486 100644 --- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java @@ -44,6 +44,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol.*; + /** * A class that handles sending segment completion protocol requests to the controller and getting @@ -58,6 +60,7 @@ public class ServerSegmentCompletionProtocolHandler { private static SSLContext _sslContext; private static Integer _controllerHttpsPort; private static int _segmentUploadRequestTimeoutMs; + private static String _authToken; private final FileUploadDownloadClient _fileUploadDownloadClient; private final ServerMetrics _serverMetrics; @@ -69,8 +72,9 @@ public class ServerSegmentCompletionProtocolHandler { _sslContext = new ClientSSLContextGenerator(httpsConfig.subset(CommonConstants.PREFIX_OF_SSL_SUBSET)).generate(); _controllerHttpsPort = httpsConfig.getProperty(CONFIG_OF_CONTROLLER_HTTPS_PORT, Integer.class); } - _segmentUploadRequestTimeoutMs = - uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS); + _segmentUploadRequestTimeoutMs = uploaderConfig + .getProperty(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS); + _authToken = uploaderConfig.getProperty(CONFIG_OF_SEGMENT_UPLOAD_AUTH_TOKEN); } public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) { @@ -87,6 +91,10 @@ public class ServerSegmentCompletionProtocolHandler { return _fileUploadDownloadClient; } + public String getAuthToken() { + return _authToken; + } + public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) { SegmentCompletionProtocol.SegmentCommitStartRequest request = new SegmentCompletionProtocol.SegmentCommitStartRequest(params); @@ -146,10 +154,11 @@ public class ServerSegmentCompletionProtocolHandler { return SegmentCompletionProtocol.RESP_NOT_SENT; } - Server2ControllerSegmentUploader segmentUploader= null; + Server2ControllerSegmentUploader segmentUploader = null; try { - segmentUploader = new Server2ControllerSegmentUploader(LOGGER, - _fileUploadDownloadClient, url, params.getSegmentName(), _segmentUploadRequestTimeoutMs, _serverMetrics); + segmentUploader = + new Server2ControllerSegmentUploader(LOGGER, _fileUploadDownloadClient, url, params.getSegmentName(), + _segmentUploadRequestTimeoutMs, _serverMetrics, _authToken); } catch (URISyntaxException e) { LOGGER.error("Segment commit upload url error: ", e); return SegmentCompletionProtocol.RESP_NOT_SENT; @@ -203,9 +212,9 @@ public class ServerSegmentCompletionProtocolHandler { private SegmentCompletionProtocol.Response sendRequest(String url) { SegmentCompletionProtocol.Response response; try { - String responseStr = - _fileUploadDownloadClient.sendSegmentCompletionProtocolRequest(new URI(url), DEFAULT_OTHER_REQUESTS_TIMEOUT) - .getResponse(); + String responseStr = _fileUploadDownloadClient + .sendSegmentCompletionProtocolRequest(new URI(url), FileUploadDownloadClient.makeAuthHeader(_authToken), null, + DEFAULT_OTHER_REQUESTS_TIMEOUT).getResponse(); response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); LOGGER.info("Controller response {} for {}", response.toJsonString(), url); if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) { @@ -228,7 +237,8 @@ public class ServerSegmentCompletionProtocolHandler { SegmentCompletionProtocol.Response response; try { String responseStr = _fileUploadDownloadClient - .uploadSegmentMetadataFiles(new URI(url), metadataFiles, _segmentUploadRequestTimeoutMs).getResponse(); + .uploadSegmentMetadataFiles(new URI(url), metadataFiles, FileUploadDownloadClient.makeAuthHeader(_authToken), + null, _segmentUploadRequestTimeoutMs).getResponse(); response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); LOGGER.info("Controller response {} for {}", response.toJsonString(), url); if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java index 85f84f1..86a7088 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java @@ -91,7 +91,7 @@ public class Server2ControllerSegmentUploaderTest { throws URISyntaxException { Server2ControllerSegmentUploader uploader = new Server2ControllerSegmentUploader(_logger, _fileUploadDownloadClient, GOOD_CONTROLLER_VIP, "segmentName", - 10000, mock(ServerMetrics.class)); + 10000, mock(ServerMetrics.class), null); URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName); Assert.assertEquals(segmentURI.toString(), SEGMENT_LOCATION); } @@ -101,7 +101,7 @@ public class Server2ControllerSegmentUploaderTest { throws URISyntaxException { Server2ControllerSegmentUploader uploader = new Server2ControllerSegmentUploader(_logger, _fileUploadDownloadClient, BAD_CONTROLLER_VIP, "segmentName", - 10000, mock(ServerMetrics.class)); + 10000, mock(ServerMetrics.class), null); URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName); Assert.assertNull(segmentURI); } diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java index 35e58fa..6ba1d67 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java @@ -45,6 +45,7 @@ public class MinionContext { // For segment upload private SSLContext _sslContext; + private String _taskAuthToken; // For PurgeTask private SegmentPurger.RecordPurgerFactory _recordPurgerFactory; @@ -97,4 +98,12 @@ public class MinionContext { public void setRecordModifierFactory(SegmentPurger.RecordModifierFactory recordModifierFactory) { _recordModifierFactory = recordModifierFactory; } + + public String getTaskAuthToken() { + return _taskAuthToken; + } + + public void setTaskAuthToken(String taskAuthToken) { + _taskAuthToken = taskAuthToken; + } } diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java index 015b8e2..37aa51d 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java @@ -160,6 +160,9 @@ public class MinionStarter implements ServiceStartable { minionMetrics.initializeGlobalMeters(); minionContext.setMinionMetrics(minionMetrics); + // initialize authentication + minionContext.setTaskAuthToken(_config.getProperty(CommonConstants.Minion.CONFIG_OF_TASK_AUTH_TOKEN)); + // Start all components LOGGER.info("Initializing PinotFSFactory"); PinotConfiguration pinotFSConfig = _config.subset(CommonConstants.Minion.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY); diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java index ee7be9e..f9e7ebe 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java @@ -88,6 +88,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY); String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR); String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY); + String authToken = configs.get(MinionConstants.AUTH_TOKEN); LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType, tableNameWithType, inputSegmentNames, downloadURLString, uploadURL); @@ -149,8 +150,9 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe TableNameBuilder.extractRawTableName(tableNameWithType)); List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter); - SegmentConversionUtils.uploadSegment(configs, null, parameters, tableNameWithType, resultSegmentName, uploadURL, - convertedTarredSegmentFile); + SegmentConversionUtils + .uploadSegment(configs, FileUploadDownloadClient.makeAuthHeader(authToken), parameters, tableNameWithType, + resultSegmentName, uploadURL, convertedTarredSegmentFile); } String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName) diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java index 0b94a23..4f9d779 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java @@ -20,6 +20,7 @@ package org.apache.pinot.minion.executor; import com.google.common.base.Preconditions; import java.io.File; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -72,6 +73,7 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut String downloadURL = configs.get(MinionConstants.DOWNLOAD_URL_KEY); String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY); String originalSegmentCrc = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY); + String authToken = configs.get(MinionConstants.AUTH_TOKEN); LOGGER.info("Start executing {} on table: {}, segment: {} with downloadURL: {}, uploadURL: {}", taskType, tableNameWithType, segmentName, downloadURL, uploadURL); @@ -121,7 +123,10 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER, segmentZKMetadataCustomMapModifier.toJsonString()); - List<Header> httpHeaders = Arrays.asList(ifMatchHeader, segmentZKMetadataCustomMapModifierHeader); + List<Header> httpHeaders = new ArrayList<>(); + httpHeaders.add(ifMatchHeader); + httpHeaders.add(segmentZKMetadataCustomMapModifierHeader); + httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken)); // Set parameters for upload request. NameValuePair enableParallelPushProtectionParameter = diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java index ce5c840..b8adc80 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java @@ -25,6 +25,7 @@ import org.apache.helix.task.Task; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskFactory; import org.apache.helix.task.TaskResult; +import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.minion.MinionContext; import org.apache.pinot.minion.event.EventObserverFactoryRegistry; @@ -71,6 +72,9 @@ public class TaskFactoryRegistry { MinionMetrics minionMetrics = MinionContext.getInstance().getMinionMetrics(); PinotTaskConfig pinotTaskConfig = PinotTaskConfig.fromHelixTaskConfig(_taskConfig); + pinotTaskConfig.getConfigs() + .put(MinionConstants.AUTH_TOKEN, MinionContext.getInstance().getTaskAuthToken()); + _eventObserver.notifyTaskStart(pinotTaskConfig); minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_EXECUTED, 1L); LOGGER.info("Start running {}: {} with configs: {}", pinotTaskConfig.getTaskType(), _taskConfig.getId(), diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java index 5cc69ba..43efce6 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java @@ -19,12 +19,12 @@ package org.apache.pinot.plugin.ingestion.batch.common; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import java.io.File; import java.io.InputStream; import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -33,9 +33,7 @@ import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; -import org.apache.http.NameValuePair; import org.apache.http.message.BasicHeader; -import org.apache.http.message.BasicNameValuePair; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.SimpleHttpResponse; @@ -177,7 +175,10 @@ public class SegmentPushUtils implements Serializable { RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { try { SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT - .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentUri, tableName); + .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentUri, + FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()), + FileUploadDownloadClient.makeTableParam(tableName), + FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); LOGGER.info("Response for pushing table {} segment uri {} to location {} - {}: {}", tableName, segmentUri, controllerURI, response.getStatusCode(), response.getResponse()); return true; @@ -248,17 +249,18 @@ public class SegmentPushUtils implements Serializable { } RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { try { - List<Header> headers = ImmutableList - .of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath), - new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, - FileUploadDownloadClient.FileUploadType.METADATA.toString())); - // Add table name as a request parameter - NameValuePair tableNameValuePair = - new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName); - List<NameValuePair> parameters = Arrays.asList(tableNameValuePair); + List<Header> headers = new ArrayList<>(); + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath)); + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, + FileUploadDownloadClient.FileUploadType.METADATA.toString())); + if (StringUtils.isNotBlank(spec.getAuthToken())) { + headers.addAll(FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken())); + } + SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT .uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, - segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS); + segmentMetadataFile, headers, FileUploadDownloadClient.makeTableParam(tableName), + FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName, controllerURI, response.getStatusCode(), response.getResponse()); return true; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java index 391d33b..43db07d 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/common/DefaultControllerRestApi.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -118,8 +117,7 @@ public class DefaultControllerRestApi implements ControllerRestApi { try (InputStream inputStream = fileSystem.open(tarFilePath)) { SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment( FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()), - segmentName, inputStream, Collections.emptyList(), - FileUploadDownloadClient.makeTableParam(_rawTableName), + segmentName, inputStream, null, FileUploadDownloadClient.makeTableParam(_rawTableName), FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse()); break; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 162ba71..7cfc19b 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -82,6 +82,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { _instanceId = _instanceDataManagerConfig.getInstanceId(); _helixManager = helixManager; _serverMetrics = serverMetrics; + _authToken = config.getProperty(CommonConstants.Server.CONFIG_OF_AUTH_TOKEN); File instanceDataDir = new File(_instanceDataManagerConfig.getInstanceDataDir()); if (!instanceDataDir.exists()) { @@ -138,7 +139,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { LOGGER.info("Creating table data manager for table: {}", tableNameWithType); TableDataManagerConfig tableDataManagerConfig = TableDataManagerConfig.getDefaultHelixTableDataManagerConfig(_instanceDataManagerConfig, tableNameWithType); - tableDataManagerConfig.overrideConfigs(tableConfig); + tableDataManagerConfig.overrideConfigs(tableConfig, _authToken); TableDataManager tableDataManager = TableDataManagerProvider .getTableDataManager(tableDataManagerConfig, _instanceId, _propertyStore, _serverMetrics, _helixManager); tableDataManager.start(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index 41c2744..0e4f60f 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -21,6 +21,7 @@ package org.apache.pinot.server.starter.helix; import java.util.Optional; import org.apache.commons.configuration.ConfigurationException; +import org.apache.http.auth.AUTH; import org.apache.pinot.common.segment.ReadMode; import org.apache.pinot.common.utils.CommonConstants.Server; import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig; @@ -60,6 +61,8 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig public static final String SEGMENT_FORMAT_VERSION = "segment.format.version"; // Key of whether to enable reloading consuming segments public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT = "reload.consumingSegment"; + // Key of the auth token + public static final String AUTH_TOKEN = "auth.token"; // Key of how many parallel realtime segments can be built. // A value of <= 0 indicates unlimited. @@ -195,6 +198,11 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig } @Override + public String getAuthToken() { + return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN); + } + + @Override public String toString() { String configString = ""; configString += "Instance Id: " + getInstanceId(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java index 7d230ea..254a7be 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java @@ -152,7 +152,10 @@ public class BackfillSegmentUtils { /** * Uploads the segment tar to the controller. + * + * NOTE: this method does not support auth tokens */ + @Deprecated public boolean uploadSegment(String rawTableName, String segmentName, File segmentDir, File outputDir) { boolean success = true; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java index 17338af..047faca 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java @@ -178,6 +178,8 @@ public class PinotConfigUtils { properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, serverDataDir); properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, serverSegmentDir); properties.put("pinot.server.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); + properties.put("pinot.server.segment.upload.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); + properties.put("pinot.server.instance.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); return properties; } @@ -191,6 +193,7 @@ public class PinotConfigUtils { properties.put(CommonConstants.Helix.KEY_OF_MINION_HOST, minionHost); properties.put(CommonConstants.Helix.KEY_OF_MINION_PORT, minionPort != 0 ? minionPort : getAvailablePort()); properties.put("segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); + properties.put("task.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); return properties; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org