This is an automated email from the ASF dual-hosted git repository. apucher pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new d03629e add optional http basic auth to pinot broker (#6552) d03629e is described below commit d03629ed6294ad2cf5a52d4b5eb6c0adef58ad5d Author: Alexander Pucher <apuc...@apache.org> AuthorDate: Tue Feb 9 14:43:17 2021 -0800 add optional http basic auth to pinot broker (#6552) Add support for an optional HTTP basic auth provider to pinot-broker, which enables user- and table-level authentication of incoming queries. --- .../pinot/broker/api/HttpRequesterIdentity.java | 47 ++++++ .../broker/api/resources/PinotClientRequest.java | 33 +++- .../broker/BasicAuthAccessControlFactory.java | 169 +++++++++++++++++++++ .../broker/broker/BasicAuthAccessControlTest.java | 152 ++++++++++++++++++ .../api/resources/PinotQueryResource.java | 15 +- 5 files changed, 405 insertions(+), 11 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/HttpRequesterIdentity.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/HttpRequesterIdentity.java new file mode 100644 index 0000000..c2dc2f1 --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/HttpRequesterIdentity.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.api; + +import com.google.common.collect.Multimap; +import java.security.Principal; + + +/** + * Identity container for HTTP requests with (optional) authorization headers + */ +public class HttpRequesterIdentity extends RequesterIdentity { + private Multimap<String, String> _httpHeaders; + private String _endpointUrl; + + public Multimap<String, String> getHttpHeaders() { + return _httpHeaders; + } + + public void setHttpHeaders(Multimap<String, String> httpHeaders) { + _httpHeaders = httpHeaders; + } + + public String getEndpointUrl() { + return _endpointUrl; + } + + public void setEndpointUrl(String endpointUrl) { + _endpointUrl = endpointUrl; + } +} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index 290de8c..65d2bf6 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -20,6 +20,8 @@ package org.apache.pinot.broker.api.resources; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -34,8 +36,10 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.pinot.broker.api.HttpRequesterIdentity; import org.apache.pinot.broker.api.RequestStatistics; import org.apache.pinot.broker.requesthandler.BrokerRequestHandler; import org.apache.pinot.common.metrics.BrokerMeter; @@ -70,7 +74,7 @@ public class PinotClientRequest { @ApiParam(value = "Query", required = true) @QueryParam("bql") String query, @ApiParam(value = "Trace enabled") @QueryParam(Request.TRACE) String traceEnabled, @ApiParam(value = "Debug options") @QueryParam(Request.DEBUG_OPTIONS) String debugOptions, - @Suspended AsyncResponse asyncResponse) { + @Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext) { try { ObjectNode requestJson = JsonUtils.newObjectNode(); requestJson.put(Request.PQL, query); @@ -80,7 +84,7 @@ public class PinotClientRequest { if (debugOptions != null) { requestJson.put(Request.DEBUG_OPTIONS, debugOptions); } - BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics()); + BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics()); asyncResponse.resume(brokerResponse.toJsonString()); } catch (Exception e) { LOGGER.error("Caught exception while processing GET request", e); @@ -95,10 +99,11 @@ public class PinotClientRequest { @Path("query") @ApiOperation(value = "Querying pinot") @ApiResponses(value = {@ApiResponse(code = 200, message = "Query response"), @ApiResponse(code = 500, message = "Internal Server Error")}) - public void processQueryPost(String query, @Suspended AsyncResponse asyncResponse) { + public void processQueryPost(String query, @Suspended AsyncResponse asyncResponse, + @Context org.glassfish.grizzly.http.server.Request requestContext) { try { JsonNode requestJson = JsonUtils.stringToJsonNode(query); - BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics()); + BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics()); asyncResponse.resume(brokerResponse); } catch (Exception e) { LOGGER.error("Caught exception while processing POST request", e); @@ -116,7 +121,7 @@ public class PinotClientRequest { public void processSqlQueryGet(@ApiParam(value = "Query", required = true) @QueryParam("sql") String query, @ApiParam(value = "Trace enabled") @QueryParam(Request.TRACE) String traceEnabled, @ApiParam(value = "Debug options") @QueryParam(Request.DEBUG_OPTIONS) String debugOptions, - @Suspended AsyncResponse asyncResponse) { + @Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext) { try { ObjectNode requestJson = JsonUtils.newObjectNode(); requestJson.put(Request.SQL, query); @@ -128,7 +133,7 @@ public class PinotClientRequest { if (debugOptions != null) { requestJson.put(Request.DEBUG_OPTIONS, debugOptions); } - BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics()); + BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics()); asyncResponse.resume(brokerResponse.toJsonString()); } catch (Exception e) { LOGGER.error("Caught exception while processing GET request", e); @@ -143,7 +148,8 @@ public class PinotClientRequest { @Path("query/sql") @ApiOperation(value = "Querying pinot using sql") @ApiResponses(value = {@ApiResponse(code = 200, message = "Query response"), @ApiResponse(code = 500, message = "Internal Server Error")}) - public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResponse) { + public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResponse, + @Context org.glassfish.grizzly.http.server.Request requestContext) { try { JsonNode requestJson = JsonUtils.stringToJsonNode(query); if (!requestJson.has(Request.SQL)) { @@ -152,7 +158,7 @@ public class PinotClientRequest { String queryOptions = constructSqlQueryOptions(); // the only query options as of now are sql related. do not allow any custom query options in sql endpoint ObjectNode sqlRequestJson = ((ObjectNode) requestJson).put(Request.QUERY_OPTIONS, queryOptions); - BrokerResponse brokerResponse = requestHandler.handleRequest(sqlRequestJson, null, new RequestStatistics()); + BrokerResponse brokerResponse = requestHandler.handleRequest(sqlRequestJson, makeHttpIdentity(requestContext), new RequestStatistics()); asyncResponse.resume(brokerResponse.toJsonString()); } catch (Exception e) { LOGGER.error("Caught exception while processing POST request", e); @@ -165,4 +171,15 @@ public class PinotClientRequest { return Request.QueryOptionKey.GROUP_BY_MODE + "=" + Request.SQL + ";" + Request.QueryOptionKey.RESPONSE_FORMAT + "=" + Request.SQL; } + + private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) { + Multimap<String, String> headers = ArrayListMultimap.create(); + context.getHeaderNames().forEach(key -> context.getHeaders(key).forEach(value -> headers.put(key, value))); + + HttpRequesterIdentity identity = new HttpRequesterIdentity(); + identity.setHttpHeaders(headers); + identity.setEndpointUrl(context.getRequestURL().toString()); + + return identity; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java new file mode 100644 index 0000000..3a670b3 --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BasicAuthAccessControlFactory.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.broker; + +import com.google.common.base.Preconditions; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.api.HttpRequesterIdentity; +import org.apache.pinot.broker.api.RequesterIdentity; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.spi.env.PinotConfiguration; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.stream.Collectors; + + +/** + * Basic Authentication based on http headers. Configured via the "pinot.broker.access.control" family of properties. + * + * <pre> + * Example: + * pinot.broker.access.control.principals=admin123,user456 + * pinot.broker.access.control.principals.admin123.password=verysecret + * pinot.broker.access.control.principals.user456.password=kindasecret + * pinot.broker.access.control.principals.user456.tables=stuff,lessImportantStuff + * </pre> + */ +public class BasicAuthAccessControlFactory extends AccessControlFactory { + private static final String PRINCIPALS = "principals"; + private static final String PASSWORD = "password"; + private static final String TABLES = "tables"; + private static final String TABLES_ALL = "*"; + + private static final String HEADER_AUTHORIZATION = "authorization"; + + private AccessControl _accessControl; + + public BasicAuthAccessControlFactory() { + // left blank + } + + public void init(PinotConfiguration configuration) { + String principalNames = configuration.getProperty(PRINCIPALS); + Preconditions.checkArgument(StringUtils.isNotBlank(principalNames), "must provide principals"); + + List<BasicAuthPrincipal> principals = Arrays.stream(principalNames.split(",")).map(rawName -> { + String name = rawName.trim(); + Preconditions.checkArgument(StringUtils.isNotBlank(name), "%s is not a valid name", name); + + String password = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, PASSWORD)); + Preconditions.checkArgument(StringUtils.isNotBlank(password), "must provide a password for %s", name); + + Set<String> tables = new HashSet<>(); + String tableNames = configuration.getProperty(String.format("%s.%s.%s", PRINCIPALS, name, TABLES)); + if (StringUtils.isNotBlank(tableNames) && !TABLES_ALL.equals(tableNames)) { + tables.addAll(Arrays.asList(tableNames.split(","))); + } + + return new BasicAuthPrincipal(name, toToken(name, password), tables); + }).collect(Collectors.toList()); + + _accessControl = new BasicAuthAccessControl(principals); + } + + public AccessControl create() { + return _accessControl; + } + + /** + * Access Control using header-based basic http authentication + */ + private static class BasicAuthAccessControl implements AccessControl { + private final Map<String, BasicAuthPrincipal> _principals; + + public BasicAuthAccessControl(Collection<BasicAuthPrincipal> principals) { + _principals = principals.stream().collect(Collectors.toMap(BasicAuthPrincipal::getToken, p -> p)); + } + + @Override + public boolean hasAccess(RequesterIdentity requesterIdentity, BrokerRequest brokerRequest) { + Preconditions.checkArgument(requesterIdentity instanceof HttpRequesterIdentity, "HttpRequesterIdentity required"); + HttpRequesterIdentity identity = (HttpRequesterIdentity) requesterIdentity; + + Collection<String> tokens = identity.getHttpHeaders().get(HEADER_AUTHORIZATION); + Optional<BasicAuthPrincipal> principalOpt = + tokens.stream().map(BasicAuthAccessControlFactory::normalizeToken).map(_principals::get) + .filter(Objects::nonNull).findFirst(); + + if (!principalOpt.isPresent()) { + // no matching token? reject + return false; + } + + BasicAuthPrincipal principal = principalOpt.get(); + if (principal.getTables().isEmpty() || !brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource() + .isSetTableName()) { + // no table restrictions? accept + return true; + } + + return principal.getTables().contains(brokerRequest.getQuerySource().getTableName()); + } + } + + /** + * Container object for basic auth principal + */ + private static class BasicAuthPrincipal { + private final String _name; + private final String _token; + private final Set<String> _tables; + + public BasicAuthPrincipal(String name, String token, Set<String> tables) { + _name = name; + _token = token; + _tables = tables; + } + + public String getName() { + return _name; + } + + public Set<String> getTables() { + return _tables; + } + + public String getToken() { + return _token; + } + } + + private static String toToken(String name, String password) { + String identifier = String.format("%s:%s", name, password); + return normalizeToken( + String.format("Basic %s", Base64.getEncoder().encodeToString(identifier.getBytes(StandardCharsets.UTF_8)))); + } + + /** + * Implementations of base64 encoding vary and may generate different numbers of padding characters "=". We normalize + * these by removing any padding. + * + * @param token raw token + * @return normalized token + */ + @Nullable + private static String normalizeToken(String token) { + if (token == null) { + return null; + } + return StringUtils.remove(token.trim(), '='); + } +} diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BasicAuthAccessControlTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BasicAuthAccessControlTest.java new file mode 100644 index 0000000..0835b40 --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BasicAuthAccessControlTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.broker; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.api.HttpRequesterIdentity; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.Map; + + +public class BasicAuthAccessControlTest { + private static final String TOKEN_USER = "Basic dXNlcjpzZWNyZXQ"; // user:secret + private static final String TOKEN_ADMIN = "Basic YWRtaW46dmVyeXNlY3JldA"; // admin:verysecret + + private static final String HEADER_AUTHORIZATION = "authorization"; + + private AccessControl _accessControl; + + @BeforeClass + public void setup() { + Map<String, Object> config = new HashMap<>(); + config.put("principals", "admin,user"); + config.put("principals.admin.password", "verysecret"); + config.put("principals.user.password", "secret"); + config.put("principals.user.tables", "lessImportantStuff"); + + AccessControlFactory factory = new BasicAuthAccessControlFactory(); + factory.init(new PinotConfiguration(config)); + + _accessControl = factory.create(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testNullEntity() { + _accessControl.hasAccess(null, null); + } + + @Test + public void testNullToken() { + Multimap<String, String> headers = ArrayListMultimap.create(); + + HttpRequesterIdentity identity = new HttpRequesterIdentity(); + identity.setHttpHeaders(headers); + + Assert.assertFalse(_accessControl.hasAccess(identity, null)); + } + + @Test + public void testAllow() { + Multimap<String, String> headers = ArrayListMultimap.create(); + headers.put(HEADER_AUTHORIZATION, TOKEN_USER); + + HttpRequesterIdentity identity = new HttpRequesterIdentity(); + identity.setHttpHeaders(headers); + + QuerySource source = new QuerySource(); + source.setTableName("lessImportantStuff"); + + BrokerRequest request = new BrokerRequest(); + request.setQuerySource(source); + + Assert.assertTrue(_accessControl.hasAccess(identity, request)); + } + + @Test + public void testDeny() { + Multimap<String, String> headers = ArrayListMultimap.create(); + headers.put(HEADER_AUTHORIZATION, TOKEN_USER); + + HttpRequesterIdentity identity = new HttpRequesterIdentity(); + identity.setHttpHeaders(headers); + + QuerySource source = new QuerySource(); + source.setTableName("veryImportantStuff"); + + BrokerRequest request = new BrokerRequest(); + request.setQuerySource(source); + + Assert.assertFalse(_accessControl.hasAccess(identity, request)); + } + + @Test + public void testAllowAll() { + Multimap<String, String> headers = ArrayListMultimap.create(); + headers.put(HEADER_AUTHORIZATION, TOKEN_ADMIN); + + HttpRequesterIdentity identity = new HttpRequesterIdentity(); + identity.setHttpHeaders(headers); + + QuerySource source = new QuerySource(); + source.setTableName("veryImportantStuff"); + + BrokerRequest request = new BrokerRequest(); + request.setQuerySource(source); + + Assert.assertTrue(_accessControl.hasAccess(identity, request)); + } + + @Test + public void testAllowNonTable() { + Multimap<String, String> headers = ArrayListMultimap.create(); + headers.put(HEADER_AUTHORIZATION, TOKEN_USER); + + HttpRequesterIdentity identity = new HttpRequesterIdentity(); + identity.setHttpHeaders(headers); + + BrokerRequest request = new BrokerRequest(); + + Assert.assertTrue(_accessControl.hasAccess(identity, request)); + } + + @Test + public void testNormalizeToken() { + Multimap<String, String> headers = ArrayListMultimap.create(); + headers.put(HEADER_AUTHORIZATION, " " + TOKEN_USER + "== "); + + HttpRequesterIdentity identity = new HttpRequesterIdentity(); + identity.setHttpHeaders(headers); + + QuerySource source = new QuerySource(); + source.setTableName("lessImportantStuff"); + + BrokerRequest request = new BrokerRequest(); + request.setQuerySource(source); + + Assert.assertTrue(_accessControl.hasAccess(identity, request)); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java index dfac60a..8363e80 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -41,6 +42,7 @@ import javax.ws.rs.Path; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.Utils; import org.apache.pinot.common.exception.QueryException; @@ -210,7 +212,14 @@ public class PinotQueryResource { String url = getQueryURL(protocol, hostNameWithPrefix.substring(hostNameWithPrefix.indexOf("_") + 1), String.valueOf(port), querySyntax); ObjectNode requestJson = getRequestJson(query, traceEnabled, queryOptions, querySyntax); - return sendRequestRaw(url, query, requestJson); + + // forward client-supplied headers + Map<String, String> headers = httpHeaders.getRequestHeaders().entrySet().stream() + .filter(entry -> !entry.getValue().isEmpty()) + .map(entry -> Pair.of(entry.getKey(), entry.getValue().get(0))) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + return sendRequestRaw(url, query, requestJson, headers); } private ObjectNode getRequestJson(String query, String traceEnabled, String queryOptions, String querySyntax) { @@ -325,10 +334,10 @@ public class PinotQueryResource { } } - public String sendRequestRaw(String url, String query, ObjectNode requestJson) { + public String sendRequestRaw(String url, String query, ObjectNode requestJson, Map<String, String> headers) { try { final long startTime = System.currentTimeMillis(); - final String pinotResultString = sendPostRaw(url, requestJson.toString(), null); + final String pinotResultString = sendPostRaw(url, requestJson.toString(), headers); final long queryTime = System.currentTimeMillis() - startTime; LOGGER.info("Query: " + query + " Time: " + queryTime); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org