This is an automated email from the ASF dual-hosted git repository. apucher pushed a commit to branch basic-auth-controller in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 0f47b6097ba4ece74038eaa6436beda157fbb524 Author: Alexander Pucher <a...@alexpucher.com> AuthorDate: Wed Feb 10 18:45:36 2021 -0800 tokens everywhere --- .../broker/BasicAuthAccessControlFactory.java | 95 +++-------------- .../segment/generation/SegmentGenerationUtils.java | 43 ++++++-- .../common/utils/FileUploadDownloadClient.java | 54 +++++++++- .../common/utils/fetcher/BaseSegmentFetcher.java | 4 +- .../common/utils/fetcher/HttpSegmentFetcher.java | 4 +- .../utils/fetcher/SegmentFetcherFactory.java | 69 +++++++++---- .../api/access/BasicAuthAccessControlFactory.java | 112 ++++++--------------- .../apache/pinot/core/auth/BasicAuthPrincipal.java | 37 +++++++ .../org/apache/pinot/core/auth/BasicAuthUtils.java | 100 ++++++++++++++++++ .../SegmentGenerationAndPushTaskExecutor.java | 7 +- .../ingestion/batch/common/SegmentPushUtils.java | 5 +- .../batch/hadoop/HadoopSegmentCreationMapper.java | 14 +-- .../spark/SparkSegmentGenerationJobRunner.java | 23 +++-- .../standalone/SegmentGenerationJobRunner.java | 8 +- .../starter/helix/HelixInstanceDataManager.java | 1 + .../spi/ingestion/batch/BatchConfigProperties.java | 1 + .../org/apache/pinot/tools/BootstrapTableTool.java | 4 +- .../admin/command/AbstractBaseAdminCommand.java | 41 ++++++-- .../tools/admin/command/AddSchemaCommand.java | 25 +++-- .../pinot/tools/admin/command/AddTableCommand.java | 21 ++-- .../tools/admin/command/AddTenantCommand.java | 10 +- .../tools/admin/command/BootstrapTableCommand.java | 7 +- .../tools/admin/command/ChangeTableState.java | 10 +- .../tools/admin/command/ImportDataCommand.java | 9 +- .../admin/command/OperateClusterConfigCommand.java | 30 ++++-- .../tools/admin/command/PostQueryCommand.java | 16 ++- .../tools/admin/command/QuickstartRunner.java | 4 +- .../tools/admin/command/UploadSegmentCommand.java | 10 +- .../apache/pinot/tools/utils/PinotConfigUtils.java | 3 + 29 files changed, 499 insertions(+), 268 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java index 3a670b3..0d00642 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java @@ -19,16 +19,18 @@ package org.apache.pinot.broker.broker; import com.google.common.base.Preconditions; -import javax.annotation.Nullable; -import org.apache.commons.lang3.StringUtils; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; import org.apache.pinot.broker.api.AccessControl; import org.apache.pinot.broker.api.HttpRequesterIdentity; import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.core.auth.BasicAuthPrincipal; +import org.apache.pinot.core.auth.BasicAuthUtils; import org.apache.pinot.spi.env.PinotConfiguration; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.stream.Collectors; /** @@ -43,10 +45,7 @@ import java.util.stream.Collectors; * </pre> */ public class BasicAuthAccessControlFactory extends AccessControlFactory { - private static final String PRINCIPALS = "principals"; - private static final String PASSWORD = "password"; - private static final String TABLES = "tables"; - private static final String TABLES_ALL = "*"; + private static final String PREFIX = "principals"; private static final String HEADER_AUTHORIZATION = "authorization"; @@ -57,26 +56,7 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory { } public void init(PinotConfiguration configuration) { - String principalNames = configuration.getProperty(PRINCIPALS); - Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals"); - - List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> { - String name = rawName.trim(); - Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name); - - String password = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, PASSWORD)); - Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name); - - Set<String> tables = new HashSet<>(); - String tableNames = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, TABLES)); - if (StringUtils.isNotBlank(tableNames) && !TABLES_ALL.equals(tableNames)) { - tables.addAll(Arrays.asList(tableNames.split(","))); - } - - return new BasicAuthPrincipal(name, toToken(name, password), tables); - }).collect(Collectors.toList()); - - _accessControl = new BasicAuthAccessControl(principals); + _accessControl = new BasicAuthAccessControl(BasicAuthUtils.extractBasicAuthPrincipals(configuration, PREFIX)); } public AccessControl create() { @@ -100,8 +80,8 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory { Collection<String> tokens = identity.getHttpHeaders().get(HEADER_AUTHORIZATION); Optional<BasicAuthPrincipal> principalOpt = - tokens.stream().map(BasicAuthAccessControlFactory::normalizeToken).map(_principals::get) - .filter(Objects::nonNull).findFirst(); + tokens.stream().map(BasicAuthUtils::normalizeBase64Token).map(_principals::get).filter(Objects::nonNull) + .findFirst(); if (!principalOpt.isPresent()) { // no matching token? reject @@ -109,61 +89,12 @@ public class BasicAuthAccessControlFactory extends AccessControlFactory { } BasicAuthPrincipal principal = principalOpt.get(); - if (principal.getTables().isEmpty() || !brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource() - .isSetTableName()) { + if (!brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource().isSetTableName()) { // no table restrictions? accept return true; } - return principal.getTables().contains(brokerRequest.getQuerySource().getTableName()); - } - } - - /** - * Container object for basic auth principal - */ - private static class BasicAuthPrincipal { - private final String _name; - private final String _token; - private final Set<String> _tables; - - public BasicAuthPrincipal(String name, String token, Set<String> tables) { - _name = name; - _token = token; - _tables = tables; - } - - public String getName() { - return _name; - } - - public Set<String> getTables() { - return _tables; - } - - public String getToken() { - return _token; - } - } - - private static String toToken(String name, String password) { - String identifier = String.format("%s:%s", name, password); - return normalizeToken( - String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes(StandardCharsets.UTF_8)))); - } - - /** - * Implementations of base64 encoding vary and may generate different numbers of padding characters "=". We normalize - * these by removing any padding. - * - * @param token raw token - * @return normalized token - */ - @Nullable - private static String normalizeToken(String token) { - if (token == null) { - return null; + return principal.hasTable(brokerRequest.getQuerySource().getTableName()); } - return StringUtils.remove(token.trim(), '='); } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java index 3c0c06e..588d9fd 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java @@ -26,8 +26,11 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; import java.nio.charset.StandardCharsets; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.filesystem.PinotFS; @@ -50,6 +53,10 @@ public class SegmentGenerationUtils { } public static Schema getSchema(String schemaURIString) { + return getSchema(schemaURIString, null); + } + + public static Schema getSchema(String schemaURIString, String authToken) { URI schemaURI; try { schemaURI = new URI(schemaURIString); @@ -75,7 +82,7 @@ public class SegmentGenerationUtils { } else { // Try to directly read from URI. try { - schemaJson = IOUtils.toString(schemaURI, StandardCharsets.UTF_8); + schemaJson = fetchUrl(schemaURI.toURL(), authToken); } catch (IOException e) { throw new RuntimeException("Failed to read from Schema URI - '" + schemaURI + "'", e); } @@ -88,6 +95,10 @@ public class SegmentGenerationUtils { } public static TableConfig getTableConfig(String tableConfigURIStr) { + return getTableConfig(tableConfigURIStr, null); + } + + public static TableConfig getTableConfig(String tableConfigURIStr, String authToken) { URI tableConfigURI; try { tableConfigURI = new URI(tableConfigURIStr); @@ -106,7 +117,7 @@ public class SegmentGenerationUtils { } } else { try { - tableConfigJson = IOUtils.toString(tableConfigURI, StandardCharsets.UTF_8); + tableConfigJson = fetchUrl(tableConfigURI.toURL(), authToken); } catch (IOException e) { throw new RuntimeException( "Failed to read from table config file data stream on Pinot fs - '" + tableConfigURI + "'", e); @@ -142,7 +153,8 @@ public class SegmentGenerationUtils { public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) { URI relativePath = baseInputDir.relativize(inputFile); Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile), - "Unable to extract out the relative path for input file '" + inputFile + "', based on base input path: " + baseInputDir); + "Unable to extract out the relative path for input file '" + inputFile + "', based on base input path: " + + baseInputDir); String outputDirStr = outputDir.toString(); outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir; URI relativeOutputURI = outputDir.resolve(relativePath).resolve("."); @@ -176,10 +188,11 @@ public class SegmentGenerationUtils { throws URISyntaxException { URI fileURI = URI.create(uriStr); if (fileURI.getScheme() == null) { - return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getUserInfo(), fullUriForPathOnlyUriStr.getHost(), - fullUriForPathOnlyUriStr.getPort(), fileURI.getPath(), fileURI.getQuery(), fileURI.getFragment()); + return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getUserInfo(), + fullUriForPathOnlyUriStr.getHost(), fullUriForPathOnlyUriStr.getPort(), fileURI.getPath(), fileURI.getQuery(), + fileURI.getFragment()); } - + return fileURI; } @@ -198,4 +211,22 @@ public class SegmentGenerationUtils { } return uri; } + + /** + * Retrieve a URL via GET request, with an optional authorization token. + * + * @param url target url + * @param authToken optional auth token, or null + * @return fetched document + * @throws IOException on connection problems + */ + private static String fetchUrl(URL url, String authToken) + throws IOException { + URLConnection connection = url.openConnection(); + + if (StringUtils.isNotBlank(authToken)) { + connection.setRequestProperty("Authorization", authToken); + } + return IOUtils.toString(connection.getInputStream(), StandardCharsets.UTF_8); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 4b12e2a..401b036 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -58,6 +58,7 @@ import org.apache.http.entity.mime.content.FileBody; import org.apache.http.entity.mime.content.InputStreamBody; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.apache.pinot.common.exception.HttpErrorStatusException; @@ -340,7 +341,14 @@ public class FileUploadDownloadClient implements Closeable { } private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs) { + return getDownloadFileRequest(uri, socketTimeoutMs, null); + } + + private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken) { RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1); + if (StringUtils.isNotBlank(authToken)) { + requestBuilder.addHeader("Authorization", authToken); + } setTimeout(requestBuilder, socketTimeoutMs); String userInfo = uri.getUserInfo(); if (userInfo != null) { @@ -687,7 +695,23 @@ public class FileUploadDownloadClient implements Closeable { */ public int downloadFile(URI uri, int socketTimeoutMs, File dest) throws IOException, HttpErrorStatusException { - HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs); + return downloadFile(uri, socketTimeoutMs, dest, null); + } + + /** + * Download a file using default settings, with an optional auth token + * + * @param uri URI + * @param socketTimeoutMs Socket timeout in milliseconds + * @param dest File destination + * @param authToken optional auth token, or null + * @return Response status code + * @throws IOException + * @throws HttpErrorStatusException + */ + public int downloadFile(URI uri, int socketTimeoutMs, File dest, String authToken) + throws IOException, HttpErrorStatusException { + HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authToken); try (CloseableHttpResponse response = _httpClient.execute(request)) { StatusLine statusLine = response.getStatusLine(); int statusCode = statusLine.getStatusCode(); @@ -728,6 +752,21 @@ public class FileUploadDownloadClient implements Closeable { return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest); } + /** + * Download a file, with an optional auth token. + * + * @param uri URI + * @param dest File destination + * @param authToken optional auth token, or null + * @return Response status code + * @throws IOException + * @throws HttpErrorStatusException + */ + public int downloadFile(URI uri, File dest, String authToken) + throws IOException, HttpErrorStatusException { + return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, authToken); + } + @Override public void close() throws IOException { @@ -742,4 +781,17 @@ public class FileUploadDownloadClient implements Closeable { public static void installDefaultSSLContext(SSLContext sslContext) { _defaultSSLContext = sslContext; } + + /** + * Generate an (optional) HTTP Authorization header given an auth token + * + * @param authToken auth token + * @return list of 0 or 1 "Authorization" headers + */ + public static List<Header> makeAuthHeader(String authToken) { + if (org.apache.commons.lang3.StringUtils.isBlank(authToken)) { + return Collections.emptyList(); + } + return Collections.singletonList(new BasicHeader("Authorization", authToken)); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java index e4a00c9..1988edb 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java @@ -22,7 +22,6 @@ import java.io.File; import java.net.URI; import java.util.List; import java.util.Random; - import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.slf4j.Logger; @@ -36,6 +35,7 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher { public static final String RETRY_COUNT_CONFIG_KEY = "retry.count"; public static final String RETRY_WAIT_MS_CONFIG_KEY = "retry.wait.ms"; public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY = "retry.delay.scale.factor"; + public static final String AUTH_TOKEN = "auth.token"; public static final int DEFAULT_RETRY_COUNT = 3; public static final int DEFAULT_RETRY_WAIT_MS = 100; public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5; @@ -45,12 +45,14 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher { protected int _retryCount; protected int _retryWaitMs; protected int _retryDelayScaleFactor; + protected String _authToken; @Override public void init(PinotConfiguration config) { _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT); _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS); _retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR); + _authToken = config.getProperty(AUTH_TOKEN); doInit(config); _logger .info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs, diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java index fcaee6c..6d6d25e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java @@ -41,7 +41,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { throws Exception { RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> { try { - int statusCode = _httpClient.downloadFile(uri, dest); + int statusCode = _httpClient.downloadFile(uri, dest, _authToken); _logger .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(), statusCode); @@ -70,7 +70,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { public void fetchSegmentToLocalWithoutRetry(URI uri, File dest) throws Exception { try { - int statusCode = _httpClient.downloadFile(uri, dest); + int statusCode = _httpClient.downloadFile(uri, dest, _authToken); _logger .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(), statusCode); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java index 05b14d8..83d98e4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java @@ -24,7 +24,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.spi.crypt.PinotCrypter; import org.apache.pinot.spi.crypt.PinotCrypterFactory; @@ -34,22 +34,25 @@ import org.slf4j.LoggerFactory; public class SegmentFetcherFactory { - private SegmentFetcherFactory() { - } + private final static SegmentFetcherFactory instance = new SegmentFetcherFactory(); static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class"; private static final String PROTOCOLS_KEY = "protocols"; + private static final String AUTH_TOKEN_KEY = "auth.token"; private static final String ENCODED_SUFFIX = ".enc"; private static final Logger LOGGER = LoggerFactory.getLogger(SegmentFetcherFactory.class); - private static final Map<String, SegmentFetcher> SEGMENT_FETCHER_MAP = new HashMap<>(); - private static final SegmentFetcher DEFAULT_HTTP_SEGMENT_FETCHER = new HttpSegmentFetcher(); - private static final SegmentFetcher DEFAULT_PINOT_FS_SEGMENT_FETCHER = new PinotFSSegmentFetcher(); - - static { - PinotConfiguration emptyConfig = new PinotConfiguration(); - DEFAULT_HTTP_SEGMENT_FETCHER.init(emptyConfig); - DEFAULT_PINOT_FS_SEGMENT_FETCHER.init(emptyConfig); + + private final Map<String, SegmentFetcher> _segmentFetcherMap = new HashMap<>(); + private final SegmentFetcher _httpSegmentFetcher = new HttpSegmentFetcher(); + private final SegmentFetcher _pinotFSSegmentFetcher = new PinotFSSegmentFetcher(); + + private SegmentFetcherFactory() { + // left blank + } + + public static SegmentFetcherFactory getInstance() { + return instance; } /** @@ -57,6 +60,14 @@ public class SegmentFetcherFactory { */ public static void init(PinotConfiguration config) throws Exception { + getInstance().initInternal(config); + } + + private void initInternal(PinotConfiguration config) + throws Exception { + _httpSegmentFetcher.init(config); // directly, without sub-namespace + _pinotFSSegmentFetcher.init(config); // directly, without sub-namespace + List<String> protocols = config.getProperty(PROTOCOLS_KEY, Arrays.asList()); for (String protocol : protocols) { String segmentFetcherClassName = config.getProperty(protocol + SEGMENT_FETCHER_CLASS_KEY_SUFFIX); @@ -77,8 +88,16 @@ public class SegmentFetcherFactory { LOGGER.info("Creating segment fetcher for protocol: {} with class: {}", protocol, segmentFetcherClassName); segmentFetcher = (SegmentFetcher) Class.forName(segmentFetcherClassName).newInstance(); } - segmentFetcher.init(config.subset(protocol)); - SEGMENT_FETCHER_MAP.put(protocol, segmentFetcher); + + String authToken = config.getProperty(AUTH_TOKEN_KEY); + Map<String, Object> subConfigMap = config.subset(protocol).toMap(); + if (!subConfigMap.containsKey(AUTH_TOKEN_KEY) && StringUtils.isNotBlank(authToken)) { + subConfigMap.put(AUTH_TOKEN_KEY, authToken); + } + + segmentFetcher.init(new PinotConfiguration(subConfigMap)); + + _segmentFetcherMap.put(protocol, segmentFetcher); } } @@ -87,7 +106,11 @@ public class SegmentFetcherFactory { * ({@link HttpSegmentFetcher} for "http" and "https", {@link PinotFSSegmentFetcher} for other protocols). */ public static SegmentFetcher getSegmentFetcher(String protocol) { - SegmentFetcher segmentFetcher = SEGMENT_FETCHER_MAP.get(protocol); + return getInstance().getSegmentFetcherInternal(protocol); + } + + private SegmentFetcher getSegmentFetcherInternal(String protocol) { + SegmentFetcher segmentFetcher = _segmentFetcherMap.get(protocol); if (segmentFetcher != null) { return segmentFetcher; } else { @@ -95,9 +118,9 @@ public class SegmentFetcherFactory { switch (protocol) { case CommonConstants.HTTP_PROTOCOL: case CommonConstants.HTTPS_PROTOCOL: - return DEFAULT_HTTP_SEGMENT_FETCHER; + return _httpSegmentFetcher; default: - return DEFAULT_PINOT_FS_SEGMENT_FETCHER; + return _pinotFSSegmentFetcher; } } } @@ -107,7 +130,7 @@ public class SegmentFetcherFactory { */ public static void fetchSegmentToLocal(URI uri, File dest) throws Exception { - getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest); + getInstance().fetchSegmentToLocalInternal(uri, dest); } /** @@ -115,7 +138,12 @@ public class SegmentFetcherFactory { */ public static void fetchSegmentToLocal(String uri, File dest) throws Exception { - fetchSegmentToLocal(new URI(uri), dest); + getInstance().fetchSegmentToLocalInternal(new URI(uri), dest); + } + + private void fetchSegmentToLocalInternal(URI uri, File dest) + throws Exception { + getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest); } /** @@ -125,6 +153,11 @@ public class SegmentFetcherFactory { */ public static void fetchAndDecryptSegmentToLocal(String uri, File dest, String crypterName) throws Exception { + getInstance().fetchAndDecryptSegmentToLocalInternal(uri, dest, crypterName); + } + + private void fetchAndDecryptSegmentToLocalInternal(String uri, File dest, String crypterName) + throws Exception { if (crypterName == null) { fetchSegmentToLocal(uri, dest); } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java index 0b2c00a..d8cf80b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java @@ -18,13 +18,16 @@ */ package org.apache.pinot.controller.api.access; -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; -import org.apache.pinot.spi.env.PinotConfiguration; - -import javax.ws.rs.core.HttpHeaders; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; +import javax.ws.rs.core.HttpHeaders; +import org.apache.pinot.core.auth.BasicAuthPrincipal; +import org.apache.pinot.core.auth.BasicAuthUtils; +import org.apache.pinot.spi.env.PinotConfiguration; /** @@ -41,43 +44,13 @@ import java.util.stream.Collectors; */ public class BasicAuthAccessControlFactory implements AccessControlFactory { private static final String PREFIX = "controller.admin.access.control.principals"; - private static final String PASSWORD = "password"; - private static final String PERMISSIONS = "permissions"; - private static final String TABLES = "tables"; - private static final String ALL = "*"; private static final String HEADER_AUTHORIZATION = "Authorization"; private AccessControl _accessControl; public void init(PinotConfiguration configuration) { - String principalNames = configuration.getProperty(PREFIX); - Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals"); - - List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> { - String name = rawName.trim(); - Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name); - - String password = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PASSWORD)); - Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name); - - Set<String> tables = new HashSet<>(); - String tableNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, TABLES)); - if (StringUtils.isNotBlank(tableNames) && !ALL.equals(tableNames)) { - tables = Arrays.stream(tableNames.split(",")).map(String::trim).collect(Collectors.toSet()); - } - - Set<AccessType> permissions = new HashSet<>(); - String permissionNames = configuration.getProperty(String.format("%s.%s.%s", PREFIX, name, PERMISSIONS)); - if (StringUtils.isNotBlank(permissionNames) && !ALL.equals(tableNames)) { - permissions = Arrays.stream(permissionNames.split(",")).map(String::trim).map(String::toUpperCase) - .map(AccessType::valueOf).collect(Collectors.toSet()); - } - - return new BasicAuthPrincipal(name, toToken(name, password), tables, permissions); - }).collect(Collectors.toList()); - - _accessControl = new BasicAuthAccessControl(principals); + _accessControl = new BasicAuthAccessControl(BasicAuthUtils.extractBasicAuthPrincipals(configuration, PREFIX)); } @Override @@ -97,67 +70,40 @@ public class BasicAuthAccessControlFactory implements AccessControlFactory { @Override public boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) { + Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders); + boolean response = getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent(); return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent(); } @Override public boolean hasAccess(String tableName, AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) { - return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName) && p.hasPermission(accessType)).isPresent(); + Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders); + boolean response = + getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName) && p.hasPermission(Objects.toString(accessType))) + .isPresent(); + return getPrincipal(httpHeaders) + .filter(p -> p.hasTable(tableName) && p.hasPermission(Objects.toString(accessType))).isPresent(); } @Override public boolean hasAccess(AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) { + Optional<BasicAuthPrincipal> principal = getPrincipal(httpHeaders); + boolean response = getPrincipal(httpHeaders).isPresent(); return getPrincipal(httpHeaders).isPresent(); } private Optional<BasicAuthPrincipal> getPrincipal(HttpHeaders headers) { - return headers.getRequestHeader(HEADER_AUTHORIZATION).stream().map(BasicAuthAccessControlFactory::normalizeToken) - .map(_principals::get).filter(Objects::nonNull).findFirst(); - } - } - - /** - * Container object for basic auth principal - */ - private static class BasicAuthPrincipal { - private final String _name; - private final String _token; - private final Set<String> _tables; - private final Set<AccessType> _permissions; - - public BasicAuthPrincipal(String name, String token, Set<String> tables, Set<AccessType> permissions) { - this._name = name; - this._token = token; - this._tables = tables; - this._permissions = permissions; - } - - public String getName() { - return _name; - } - - public String getToken() { - return _token; - } - - public boolean hasTable(String tableName) { - return _tables.isEmpty() || _tables.contains(tableName); - } - - public boolean hasPermission(AccessType accessType) { - return _permissions.isEmpty() || _permissions.contains(accessType); - } - } + if (headers == null) { + return Optional.empty(); + } - private static String toToken(String name, String password) { - String identifier = String.format("%s:%s", name, password); - return normalizeToken(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes()))); - } + List<String> authHeaders = headers.getRequestHeader(HEADER_AUTHORIZATION); + if (authHeaders == null) { + return Optional.empty(); + } - private static String normalizeToken(String token) { - if (token == null) { - return null; + return authHeaders.stream().map(BasicAuthUtils::normalizeBase64Token).map(_principals::get) + .filter(Objects::nonNull).findFirst(); } - return token.trim().replace("=", ""); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java new file mode 100644 index 0000000..ae44401 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthPrincipal.java @@ -0,0 +1,37 @@ +package org.apache.pinot.core.auth; + +import java.util.Set; + + +/** + * Container object for basic auth principal + */ +public class BasicAuthPrincipal { + private final String _name; + private final String _token; + private final Set<String> _tables; + private final Set<String> _permissions; + + public BasicAuthPrincipal(String name, String token, Set<String> tables, Set<String> permissions) { + this._name = name; + this._token = token; + this._tables = tables; + this._permissions = permissions; + } + + public String getName() { + return _name; + } + + public String getToken() { + return _token; + } + + public boolean hasTable(String tableName) { + return _tables.isEmpty() || _tables.contains(tableName); + } + + public boolean hasPermission(String permission) { + return _permissions.isEmpty() || _permissions.contains(permission); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java new file mode 100644 index 0000000..46cfa6b --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/BasicAuthUtils.java @@ -0,0 +1,100 @@ +package org.apache.pinot.core.auth; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.env.PinotConfiguration; + + +/** + * Utility for configuring basic auth and parsing related http tokens + */ +public final class BasicAuthUtils { + private static final String PASSWORD = "password"; + private static final String PERMISSIONS = "permissions"; + private static final String TABLES = "tables"; + private static final String ALL = "*"; + + private BasicAuthUtils() { + // left blank + } + + /** + * Parse a pinot configuration namespace for access control settings, e.g. "controller.admin.access.control.principals". + * + * <pre> + * Example: + * my.prefix.access.control.principals=admin123,user456 + * my.prefix.access.control.principals.admin123.password=verysecret + * my.prefix.access.control.principals.user456.password=kindasecret + * my.prefix.access.control.principals.user456.tables=stuff,lessImportantStuff + * my.prefix.access.control.principals.user456.permissions=read,update + * </pre> + * + * @param configuration pinot configuration + * @param prefix configuration namespace + * @return list of BasicAuthPrincipals + */ + public static List<BasicAuthPrincipal> extractBasicAuthPrincipals(PinotConfiguration configuration, String prefix) { + String principalNames = configuration.getProperty(prefix); + Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals"); + + return Arrays.stream(principalNames.split(",")).map(rawName -> { + String name = rawName.trim(); + Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name); + + String password = configuration.getProperty(String.format("%s.%s.%s", prefix, name, PASSWORD)); + Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name); + + Set<String> tables = extractSet(configuration, String.format("%s.%s.%s", prefix, name, TABLES)); + Set<String> permissions = extractSet(configuration, String.format("%s.%s.%s", prefix, name, PERMISSIONS)); + + return new BasicAuthPrincipal(name, toBasicAuthToken(name, password), tables, permissions); + }).collect(Collectors.toList()); + } + + private static Set<String> extractSet(PinotConfiguration configuration, String key) { + String input = configuration.getProperty(key); + if (StringUtils.isNotBlank(input) && !ALL.equals(input)) { + return Arrays.stream(input.split(",")).map(String::trim).collect(Collectors.toSet()); + } + return Collections.emptySet(); + } + + /** + * Convert a pair of name and password into a http header-compliant base64 encoded token + * + * @param name user name + * @param password password + * @return base64 encoded basic auth token + */ + @Nullable + public static String toBasicAuthToken(String name, String password) { + if (StringUtils.isBlank(name)) { + return null; + } + String identifier = String.format("%s:%s", name, password); + return normalizeBase64Token(String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes()))); + } + + /** + * Normalize a base64 encoded auth token by stripping redundant padding (spaces, '=') + * + * @param token base64 encoded auth token + * @return normalized auth token + */ + @Nullable + public static String normalizeBase64Token(String token) { + if (token == null) { + return null; + } + return StringUtils.remove(token.trim(), '='); + } +} diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java index e490de2..ff4f66f 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java @@ -284,13 +284,15 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor { recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS)); taskSpec.setRecordReaderSpec(recordReaderSpec); + String authToken = taskConfigs.get(BatchConfigProperties.AUTH_TOKEN); // TODO + String tableNameWithType = taskConfigs.get(BatchConfigProperties.TABLE_NAME); Schema schema; if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) { schema = JsonUtils .stringToObject(JsonUtils.objectToString(taskConfigs.get(BatchConfigProperties.SCHEMA)), Schema.class); } else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) { - schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI)); + schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI), authToken); } else { schema = getSchema(tableNameWithType); } @@ -299,7 +301,8 @@ public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor { if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS)) { tableConfig = JsonUtils.stringToObject(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS), TableConfig.class); } else if (taskConfigs.containsKey(BatchConfigProperties.TABLE_CONFIGS_URI)) { - tableConfig = SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI)); + tableConfig = + SegmentGenerationUtils.getTableConfig(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS_URI), authToken); } else { tableConfig = getTableConfig(tableNameWithType); } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java index 4c3b41b..d15cfcd 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java @@ -26,6 +26,7 @@ import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -119,7 +120,9 @@ public class SegmentPushUtils implements Serializable { try (InputStream inputStream = fileSystem.open(tarFileURI)) { SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT .uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, inputStream, - tableName); + FileUploadDownloadClient.makeAuthHeader(spec.getAuthToken()), Collections.singletonList( + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName)), + FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName, controllerURI, response.getStatusCode(), response.getResponse()); return true; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java index 8b71584..c9622ba 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java @@ -32,9 +32,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; -import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; @@ -67,7 +67,8 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long private File _localTempDir; @Override - public void setup(Context context) throws IOException { + public void setup(Context context) + throws IOException { _jobConf = context.getConfiguration(); Yaml yaml = new Yaml(); String segmentGenerationJobSpecStr = _jobConf.get(SEGMENT_GENERATION_JOB_SPEC); @@ -96,7 +97,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long } else { LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", localPluginsTarFile.getAbsolutePath()); } - + // Register file systems List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs(); for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { @@ -146,8 +147,9 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); - taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI())); - taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI())); + taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken())); + taskSpec.setTableConfig( + SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken())); taskSpec.setSequenceId(idx); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); @@ -186,7 +188,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long .resolve(segmentTarFileName); LOGGER.info("Copying segment tar file from [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI); outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); - + FileUtils.deleteQuietly(localSegmentDir); FileUtils.deleteQuietly(localSegmentTarFile); FileUtils.deleteQuietly(localInputDataFile); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index a315d15..d2273e7 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -18,13 +18,6 @@ */ package org.apache.pinot.plugin.ingestion.batch.spark; -import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID; -import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_DIR; -import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ; -import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.getFileName; -import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_DIR_PROPERTY_NAME; -import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME; - import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -40,11 +33,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; - import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; -import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; @@ -65,6 +57,13 @@ import org.apache.spark.api.java.function.VoidFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_DIR; +import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ; +import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.getFileName; +import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID; +import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_DIR_PROPERTY_NAME; +import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME; + public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Serializable { @@ -302,8 +301,10 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); - taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI())); - taskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI())); + taskSpec + .setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken())); + taskSpec.setTableConfig( + SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken())); taskSpec.setSequenceId(idx); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java index efe1679..52cf3de 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java @@ -27,14 +27,13 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.UUID; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; -import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; @@ -165,8 +164,9 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { FileUtils.forceMkdir(localOutputTempDir); //Read TableConfig, Schema - Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()); - TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()); + Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI(), _spec.getAuthToken()); + TableConfig tableConfig = + SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), _spec.getAuthToken()); int numInputFiles = filteredFiles.size(); CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index deb3a6a..162ba71 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -70,6 +70,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { private HelixManager _helixManager; private ServerMetrics _serverMetrics; private ZkHelixPropertyStore<ZNRecord> _propertyStore; + private String _authToken; @Override public synchronized void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java index 5a28884..0e3f1c6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java @@ -49,6 +49,7 @@ public class BatchConfigProperties { public static final String PUSH_CONTROLLER_URI = "push.controllerUri"; public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix"; public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix"; + public static final String AUTH_TOKEN = "authToken"; public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri"; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java index 2e86199..35efe27 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java @@ -118,7 +118,7 @@ public class BootstrapTableTool { return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath()) .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerProtocol(_controllerProtocol) .setControllerHost(_controllerHost).setControllerPort(String.valueOf(_controllerPort)).setExecute(true) - .execute(); + .setAuthToken(_token).execute(); } private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile, @@ -178,6 +178,8 @@ public class BootstrapTableTool { tlsSpec.getTrustStorePath(), tlsSpec.getTrustStorePassword()); } + spec.setAuthToken(_token); + IngestionJobLauncher.runIngestionJob(spec); } } else { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java index 30d4c28..1debd42 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java @@ -31,10 +31,13 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.*; +import javax.annotation.Nullable; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; import org.apache.http.message.BasicHeader; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.core.auth.BasicAuthUtils; import org.apache.pinot.tools.AbstractBaseCommand; import org.apache.pinot.tools.utils.PinotConfigUtils; @@ -83,10 +86,9 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand { final URL url = new URL(urlString); final HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setDoOutput(true); + headers.forEach(header -> conn.setRequestProperty(header.getName(), header.getValue())); conn.setRequestMethod(requestMethod); - headers.stream().flatMap(header -> Arrays.stream(header.getElements())) - .forEach(elem -> conn.setRequestProperty(elem.getName(), elem.getValue())); + conn.setDoOutput(true); if (payload != null) { final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(), StandardCharsets.UTF_8)); @@ -123,16 +125,35 @@ public class AbstractBaseAdminCommand extends AbstractBaseCommand { return PinotConfigUtils.readConfigFromFile(configFileName); } - static List<Header> makeBasicAuth(String user, String password) { - if (StringUtils.isBlank(user)) { - return Collections.emptyList(); + /** + * Generate an (optional) HTTP Authorization header given an auth token + * @see FileUploadDownloadClient#makeAuthHeader(String) + * + * @param authToken auth token + * @return list of 0 or 1 "Authorization" headers + */ + static List<Header> makeAuthHeader(String authToken) { + return FileUploadDownloadClient.makeAuthHeader(authToken); + } + + /** + * Generate auth token from pass-thru token or generate basic auth from user/password pair + * + * @param authToken optional pass-thru token + * @param user optional username + * @param password optional password + * @return auth token, or null if neither pass-thru token nor user info available + */ + @Nullable + static String makeAuthToken(String authToken, String user, String password) { + if (StringUtils.isNotBlank(authToken)) { + return authToken; } - if (StringUtils.isBlank(password)) { - password = ""; + if (StringUtils.isNotBlank(user)) { + return BasicAuthUtils.toBasicAuthToken(user, password); } - String token = "Basic " + Base64.getEncoder().encodeToString((user + ":" + password).getBytes()); - return Collections.singletonList(new BasicHeader("Authorization", token)); + return null; } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java index 2bb0a99..fdf7a06 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java @@ -60,6 +60,9 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.") private String _password; + @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.") + private String _authToken; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -80,10 +83,9 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman @Override public String toString() { - String retString = - ("AddSchema -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost - + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile + " -user " + _user + " -password " - + "[hidden]"); + String retString = ("AddSchema -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost + + " -controllerPort " + _controllerPort + " -schemaFile " + _schemaFile + " -user " + _user + " -password " + + "[hidden]"); return ((_exec) ? (retString + " -exec") : retString); } @@ -114,11 +116,15 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman } public void setUser(String user) { - this._user = user; + _user = user; } public void setPassword(String password) { - this._password = password; + _password = password; + } + + public void setAuthToken(String authToken) { + _authToken = authToken; } public AddSchemaCommand setExecute(boolean exec) { @@ -147,9 +153,10 @@ public class AddSchemaCommand extends AbstractBaseAdminCommand implements Comman Schema schema = Schema.fromFile(schemaFile); try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { - fileUploadDownloadClient.addSchema( - FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)), - schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList()); + fileUploadDownloadClient.addSchema(FileUploadDownloadClient + .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)), + schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)), + Collections.emptyList()); } catch (Exception e) { LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e); return false; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java index 78123e5..e8403fa 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.File; import java.io.IOException; import java.util.Collections; - import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.NetUtil; @@ -66,6 +65,9 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.") private String _password; + @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.") + private String _authToken; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -91,7 +93,7 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command String retString = ("AddTable -tableConfigFile " + _tableConfigFile + " -schemaFile " + _schemaFile + " -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost + " -controllerPort " + _controllerPort - + " -user " + _user + " -password " + "[hidden]"); + + " -user " + _user + " -password " + "[hidden]"); return ((_exec) ? (retString + " -exec") : retString); } @@ -134,6 +136,11 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command return this; } + public AddTableCommand setAuthToken(String authToken) { + _authToken = authToken; + return this; + } + public AddTableCommand setExecute(boolean exec) { _exec = exec; return this; @@ -151,9 +158,10 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command throw e; } try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { - fileUploadDownloadClient.addSchema( - FileUploadDownloadClient.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)), - schema.getSchemaName(), schemaFile, makeBasicAuth(_user, _password), Collections.emptyList()); + fileUploadDownloadClient.addSchema(FileUploadDownloadClient + .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)), + schema.getSchemaName(), schemaFile, makeAuthHeader(makeAuthToken(_authToken, _user, _password)), + Collections.emptyList()); } catch (Exception e) { LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e); throw e; @@ -163,7 +171,8 @@ public class AddTableCommand extends AbstractBaseAdminCommand implements Command public boolean sendTableCreationRequest(JsonNode node) throws IOException { String res = AbstractBaseAdminCommand - .sendPostRequest(ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString()); + .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableCreate(), node.toString(), + makeAuthHeader(makeAuthToken(_authToken, _user, _password))); LOGGER.info(res); return res.contains("succesfully added"); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java index 3365314..efd4bf9 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java @@ -67,6 +67,9 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.") private String _password; + @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.") + private String _authToken; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -112,6 +115,11 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman return this; } + public AddTenantCommand setAuthToken(String authToken) { + _authToken = authToken; + return this; + } + public AddTenantCommand setExecute(boolean exec) { _exec = exec; return this; @@ -137,7 +145,7 @@ public class AddTenantCommand extends AbstractBaseAdminCommand implements Comman Tenant tenant = new Tenant(_role, _name, _instanceCount, _offlineInstanceCount, _realtimeInstanceCount); String res = AbstractBaseAdminCommand .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(), - tenant.toJsonString(), makeBasicAuth(_user, _password)); + tenant.toJsonString(), makeAuthHeader(makeAuthToken(_authToken, _user, _password))); LOGGER.info(res); System.out.print(res); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java index 6f7d017..12806c5 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.tools.admin.command; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.NetUtil; +import org.apache.pinot.core.auth.BasicAuthUtils; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.tools.Command; import org.apache.pinot.tools.BootstrapTableTool; @@ -82,6 +84,9 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.") private String _password; + @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.") + private String _authToken; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -122,7 +127,7 @@ public class BootstrapTableCommand extends AbstractBaseAdminCommand implements C if (_controllerHost == null) { _controllerHost = NetUtil.getHostAddress(); } - String token = ""; // TODO + String token = makeAuthToken(_authToken, _user, _password); return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir, token).execute(); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java index d240e77..83ad5f8 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java @@ -23,6 +23,7 @@ import java.net.URL; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpURL; import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.HttpGet; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.NetUtil; @@ -56,6 +57,9 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.") private String _password; + @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.") + private String _authToken; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -75,8 +79,12 @@ public class ChangeTableState extends AbstractBaseAdminCommand implements Comman URI uri = new URI(_controllerProtocol, null, _controllerHost, Integer.parseInt(_controllerPort), URI_TABLES_PATH + _tableName, "state=" + stateValue, null); + String token = makeAuthToken(_authToken, _user, _password); + GetMethod httpGet = new GetMethod(uri.toString()); - httpGet.setRequestHeader("Authorization", null); // TODO + if (StringUtils.isNotBlank(token)) { + httpGet.setRequestHeader("Authorization", token); + } int status = httpClient.executeMethod(httpGet); if (status != 200) { throw new RuntimeException("Failed to change table state, error: " + httpGet.getResponseBodyAsString()); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java index 49884e4..c432146 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java @@ -81,6 +81,9 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.") private String _password; + @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.") + private String _authToken; + @Option(name = "-tempDir", metaVar = "<string>", usage = "Temporary directory used to hold data during segment creation.") private String _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()).getAbsolutePath(); @@ -243,11 +246,7 @@ public class ImportDataCommand extends AbstractBaseAdminCommand implements Comma spec.setCleanUpOutputDir(true); spec.setOverwriteOutput(true); spec.setJobType("SegmentCreationAndTarPush"); - - if (!StringUtils.isBlank(_user)) { - String token = ""; // TODO - spec.setAuthToken(token); - } + spec.setAuthToken(makeAuthToken(_authToken, _user, _password)); // set ExecutionFrameworkSpec ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java index 97b04c6..27d3520 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java @@ -19,12 +19,11 @@ package org.apache.pinot.tools.admin.command; import com.fasterxml.jackson.databind.JsonNode; -import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Iterator; -import org.apache.commons.io.IOUtils; +import java.util.List; import org.apache.commons.lang.StringUtils; +import org.apache.http.Header; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.NetUtil; import org.apache.pinot.spi.utils.JsonUtils; @@ -52,6 +51,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.") private String _password; + @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.") + private String _authToken; + @Option(name = "-config", metaVar = "<string>", usage = "Cluster config to operate.") private String _config; @@ -73,8 +75,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem @Override public String toString() { - String toString = "Operate ClusterConfig -controllerProtocol " + _controllerProtocol + " -controllerHost " - + _controllerHost + " -controllerPort " + _controllerPort + " -operation " + _operation; + String toString = + "Operate ClusterConfig -controllerProtocol " + _controllerProtocol + " -controllerHost " + _controllerHost + + " -controllerPort " + _controllerPort + " -operation " + _operation; if (_config != null) { toString += " -config " + _config; } @@ -116,6 +119,11 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem return this; } + public OperateClusterConfigCommand setAuthToken(String authToken) { + _authToken = authToken; + return this; + } + public OperateClusterConfigCommand setConfig(String config) { _config = config; return this; @@ -135,7 +143,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem if (StringUtils.isEmpty(_config) && !_operation.equalsIgnoreCase("GET")) { throw new UnsupportedOperationException("Empty config: " + _config); } - String clusterConfigUrl = _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs"; + String clusterConfigUrl = + _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs"; + List<Header> headers = makeAuthHeader(makeAuthToken(_authToken, _user, _password)); switch (_operation.toUpperCase()) { case "ADD": case "UPDATE": @@ -145,10 +155,9 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem "Bad config: " + _config + ". Please follow the pattern of [Config Key]=[Config Value]"); } String request = JsonUtils.objectToString(Collections.singletonMap(splits[0], splits[1])); - return sendRequest("POST", clusterConfigUrl, request, makeBasicAuth(_user, _password)); + return sendRequest("POST", clusterConfigUrl, request, headers); case "GET": - // TODO - String response = IOUtils.toString(new URI(clusterConfigUrl), StandardCharsets.UTF_8); + String response = sendRequest("GET", clusterConfigUrl, null, headers); JsonNode jsonNode = JsonUtils.stringToJsonNode(response); Iterator<String> fieldNamesIterator = jsonNode.fieldNames(); String results = ""; @@ -159,8 +168,7 @@ public class OperateClusterConfigCommand extends AbstractBaseAdminCommand implem } return results; case "DELETE": - return sendRequest("DELETE", String.format("%s/%s", clusterConfigUrl, _config), null, - makeBasicAuth(_user, _password)); + return sendRequest("DELETE", String.format("%s/%s", clusterConfigUrl, _config), null, headers); default: throw new UnsupportedOperationException("Unsupported operation: " + _operation); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java index 4afb686..72c16e4 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java @@ -21,8 +21,8 @@ package org.apache.pinot.tools.admin.command; import java.util.Collections; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.CommonConstants.Broker.Request; -import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.common.utils.NetUtil; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.Command; import org.kohsuke.args4j.Option; import org.slf4j.Logger; @@ -53,6 +53,9 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.") private String _password; + @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.") + private String _authToken; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -68,8 +71,8 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman @Override public String toString() { - return ("PostQuery -brokerProtocol " + _brokerProtocol + " -brokerHost " + _brokerHost + " -brokerPort " + - _brokerPort + " -queryType " + _queryType + " -query " + _query); + return ("PostQuery -brokerProtocol " + _brokerProtocol + " -brokerHost " + _brokerHost + " -brokerPort " + + _brokerPort + " -queryType " + _queryType + " -query " + _query); } @Override @@ -107,6 +110,11 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman return this; } + public PostQueryCommand setauthToken(String authToken) { + _authToken = authToken; + return this; + } + public PostQueryCommand setQueryType(String queryType) { _queryType = queryType; return this; @@ -133,7 +141,7 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman request = JsonUtils.objectToString(Collections.singletonMap(Request.PQL, _query)); } - return sendRequest("POST", urlString, request, makeBasicAuth(_user, _password)); + return sendRequest("POST", urlString, request, makeAuthHeader(makeAuthToken(_authToken, _user, _password))); } @Override diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java index 3630602..db53c31 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Random; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.core.auth.BasicAuthUtils; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.tenant.TenantRole; import org.apache.pinot.spi.env.PinotConfiguration; @@ -202,9 +203,10 @@ public class QuickstartRunner { public void bootstrapTable() throws Exception { + String token = BasicAuthUtils.toBasicAuthToken("admin", "verysecret"); for (QuickstartTableRequest request : _tableRequests) { if (!new BootstrapTableTool("http", InetAddress.getLocalHost().getHostName(), _controllerPorts.get(0), - request.getBootstrapTableDir(), null).execute()) { + request.getBootstrapTableDir(), token).execute()) { throw new RuntimeException("Failed to bootstrap table with request - " + request); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java index d393e57..679a2a6 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java @@ -58,6 +58,9 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co @Option(name = "-password", required = false, metaVar = "<String>", usage = "Password for basic auth.") private String _password; + @Option(name = "-authToken", required = false, metaVar = "<String>", usage = "Http auth token.") + private String _authToken; + @Option(name = "-segmentDir", required = true, metaVar = "<string>", usage = "Path to segment directory.") private String _segmentDir = null; @@ -119,6 +122,11 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co return this; } + public UploadSegmentCommand setAuthToken(String authToken) { + _authToken = authToken; + return this; + } + public UploadSegmentCommand setSegmentDir(String segmentDir) { _segmentDir = segmentDir; return this; @@ -159,7 +167,7 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co LOGGER.info("Uploading segment tar file: {}", segmentTarFile); fileUploadDownloadClient .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, - makeBasicAuth(_user, _password), Collections.singletonList(new BasicNameValuePair( + makeAuthHeader(makeAuthToken(_authToken, _user, _password)), Collections.singletonList(new BasicNameValuePair( FileUploadDownloadClient.QueryParameters.TABLE_NAME, _tableName)), FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java index 0354fd9..7a17845 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java @@ -76,6 +76,7 @@ public class PinotConfigUtils { properties.put("controller.admin.access.control.principals.user.password", "secret"); properties.put("controller.admin.access.control.principals.user.tables", "baseballStats"); properties.put("controller.admin.access.control.principals.user.permissions", "read"); + properties.put("pinot.controller.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); return properties; } @@ -176,6 +177,7 @@ public class PinotConfigUtils { properties.put(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, serverAdminPort); properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, serverDataDir); properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, serverSegmentDir); + properties.put("pinot.server.segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); return properties; } @@ -188,6 +190,7 @@ public class PinotConfigUtils { Map<String, Object> properties = new HashMap<>(); properties.put(CommonConstants.Helix.KEY_OF_MINION_HOST, minionHost); properties.put(CommonConstants.Helix.KEY_OF_MINION_PORT, minionPort != 0 ? minionPort : getAvailablePort()); + properties.put("segment.fetcher.auth.token", "Basic YWRtaW46dmVyeXNlY3JldA=="); return properties; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org