This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 078c47a63f [timeseries] Adding requester level access control check for timeseries queries (#16066) 078c47a63f is described below commit 078c47a63f62ff7a39248860a9cd803a9fdd503f Author: Shaurya Chaturvedi <shauryach...@gmail.com> AuthorDate: Fri Jun 13 13:04:29 2025 -0700 [timeseries] Adding requester level access control check for timeseries queries (#16066) Co-authored-by: shauryachats <shauryach...@uber.com> --- .../broker/api/resources/PinotClientRequest.java | 8 +- .../requesthandler/BrokerRequestHandler.java | 2 +- .../BrokerRequestHandlerDelegate.java | 5 +- .../requesthandler/TimeSeriesRequestHandler.java | 24 ++- .../pinot/integration/tests/ClusterTest.java | 4 +- .../tests/TimeSeriesAuthIntegrationTest.java | 177 +++++++++++++++++++++ .../tests/TimeSeriesIntegrationTest.java | 8 +- 7 files changed, 218 insertions(+), 10 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index 0428d107cc..ad28f32733 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -78,6 +78,7 @@ import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; +import org.apache.pinot.spi.auth.broker.RequesterIdentity; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.query.QueryThreadContext; @@ -292,7 +293,8 @@ public class PinotClientRequest { try { try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) { String queryString = requestCtx.getQueryString(); - PinotBrokerTimeSeriesResponse response = executeTimeSeriesQuery(language, queryString, requestContext); + PinotBrokerTimeSeriesResponse response = executeTimeSeriesQuery(language, queryString, requestContext, + makeHttpIdentity(requestCtx)); if (response.getErrorType() != null && !response.getErrorType().isEmpty()) { asyncResponse.resume(Response.serverError().entity(response).build()); return; @@ -536,8 +538,8 @@ public class PinotClientRequest { } private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String language, String queryString, - RequestContext requestContext) { - return _requestHandler.handleTimeSeriesRequest(language, queryString, requestContext); + RequestContext requestContext, RequesterIdentity requesterIdentity) { + return _requestHandler.handleTimeSeriesRequest(language, queryString, requestContext, requesterIdentity); } public static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java index 9dfb43ace9..d4d143747a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java @@ -65,7 +65,7 @@ public interface BrokerRequestHandler { * Run a query and use the time-series engine. */ default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString, - RequestContext requestContext) { + RequestContext requestContext, @Nullable RequesterIdentity requesterIdentity) { throw new UnsupportedOperationException("Handler does not support Time Series requests"); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java index c2173b3728..4a7929966e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -125,9 +125,10 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler { @Override public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString, - RequestContext requestContext) { + RequestContext requestContext, RequesterIdentity requesterIdentity) { if (_timeSeriesRequestHandler != null) { - return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, rawQueryParamString, requestContext); + return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, rawQueryParamString, requestContext, + requesterIdentity); } return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time series query engine not enabled."); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java index 137700b374..7a6ff11147 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java @@ -29,7 +29,9 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.http.NameValuePair; @@ -45,6 +47,7 @@ import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; import org.apache.pinot.common.utils.HumanReadableDuration; import org.apache.pinot.query.service.dispatch.QueryDispatcher; +import org.apache.pinot.spi.auth.AuthorizationResult; import org.apache.pinot.spi.auth.broker.RequesterIdentity; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.trace.RequestContext; @@ -101,7 +104,7 @@ public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler { @Override public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString, - RequestContext requestContext) { + RequestContext requestContext, RequesterIdentity requesterIdentity) { PinotBrokerTimeSeriesResponse timeSeriesResponse = null; long queryStartTime = System.currentTimeMillis(); try { @@ -109,6 +112,7 @@ public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler { requestContext.setBrokerId(_brokerId); requestContext.setRequestId(_requestIdGenerator.get()); RangeTimeSeriesRequest timeSeriesRequest = null; + firstStageAccessControlCheck(requesterIdentity); try { timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString); } catch (URISyntaxException e) { @@ -219,4 +223,22 @@ public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler { } return HumanReadableDuration.from(step).getSeconds(); } + + /** + * First-stage access control check for the request. + * This method checks if the requester has access to the broker to prevent unauthenticated requests from + * using up resources. + * Secondary table-level access control checks will be performed later. + * + * @param requesterIdentity The identity of the requester. + */ + private void firstStageAccessControlCheck(RequesterIdentity requesterIdentity) { + AccessControl accessControl = _accessControlFactory.create(); + AuthorizationResult authorizationResult = accessControl.authorize(requesterIdentity); + if (!authorizationResult.hasAccess()) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); + throw new WebApplicationException("Permission denied. " + authorizationResult.getFailureMessage(), + Response.Status.FORBIDDEN); + } + } } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index e1fc48bdb0..01442e4048 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -555,12 +555,12 @@ public abstract class ClusterTest extends ControllerTest { * Queries the broker's timeseries query endpoint (/timeseries/api/v1/query_range). * This is used for testing timeseries queries. */ - public JsonNode getTimeseriesQuery(String query, long startTime, long endTime) { + public JsonNode getTimeseriesQuery(String query, long startTime, long endTime, Map<String, String> headers) { try { Map<String, String> queryParams = Map.of("language", "m3ql", "query", query, "start", String.valueOf(startTime), "end", String.valueOf(endTime)); String url = buildQueryUrl(getTimeSeriesQueryApiUrl(getBrokerBaseApiUrl()), queryParams); - JsonNode responseJsonNode = JsonUtils.stringToJsonNode(sendGetRequest(url, Map.of())); + JsonNode responseJsonNode = JsonUtils.stringToJsonNode(sendGetRequest(url, headers)); return sanitizeResponse(responseJsonNode); } catch (Exception e) { throw new RuntimeException("Failed to get timeseries query: " + query, e); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesAuthIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesAuthIntegrationTest.java new file mode 100644 index 0000000000..9c4e885f37 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesAuthIntegrationTest.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.http.HttpStatus; +import org.apache.pinot.client.Connection; +import org.apache.pinot.client.ConnectionFactory; +import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.helix.ControllerRequestClient; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_HEADER; +import static org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_TOKEN; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +public class TimeSeriesAuthIntegrationTest extends TimeSeriesIntegrationTest { + protected static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesAuthIntegrationTest.class); + + @Override + protected Map<String, String> getHeaders() { + return BasicAuthTestUtils.AUTH_HEADER; + } + + @Override + public ControllerRequestClient getControllerRequestClient() { + if (_controllerRequestClient == null) { + _controllerRequestClient = + new ControllerRequestClient(_controllerRequestURLBuilder, getHttpClient(), AUTH_HEADER); + } + return _controllerRequestClient; + } + + @Override + protected Connection getPinotConnection() { + if (_pinotConnection == null) { + JsonAsyncHttpPinotClientTransportFactory factory = new JsonAsyncHttpPinotClientTransportFactory(); + factory.setHeaders(AUTH_HEADER); + + _pinotConnection = + ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(), factory.buildTransport()); + } + return _pinotConnection; + } + + @Override + protected void overrideControllerConf(Map<String, Object> properties) { + BasicAuthTestUtils.addControllerConfiguration(properties); + } + + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); + BasicAuthTestUtils.addBrokerConfiguration(brokerConf); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + super.overrideServerConf(serverConf); + BasicAuthTestUtils.addServerConfiguration(serverConf); + } + + // TODO: Enhance base method to support custom headers. + @Override + protected void uploadSegments(String tableName, TableType tableType, List<File> tarDirs) + throws Exception { + List<File> segmentTarFiles = new ArrayList<>(); + for (File tarDir : tarDirs) { + File[] tarFiles = tarDir.listFiles(); + assertNotNull(tarFiles); + Collections.addAll(segmentTarFiles, tarFiles); + } + int numSegments = segmentTarFiles.size(); + assertTrue(numSegments > 0); + + URI uploadSegmentHttpURI = URI.create(getControllerRequestURLBuilder().forSegmentUpload()); + NameValuePair + tableNameValuePair = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName); + NameValuePair tableTypeValuePair = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, + tableType.name()); + List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, tableTypeValuePair); + List<Header> headers = List.of(new BasicHeader("Authorization", AUTH_TOKEN)); + + try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { + if (numSegments == 1) { + File segmentTarFile = segmentTarFiles.get(0); + if (System.currentTimeMillis() % 2 == 0) { + assertEquals( + fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, + headers, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode(), HttpStatus.SC_OK); + } else { + assertEquals( + uploadSegmentWithOnlyMetadata(tableName, tableType, uploadSegmentHttpURI, fileUploadDownloadClient, + segmentTarFile), HttpStatus.SC_OK); + } + } else { + // Upload all segments in parallel + ExecutorService executorService = Executors.newFixedThreadPool(numSegments); + List<Future<Integer>> futures = new ArrayList<>(numSegments); + for (File segmentTarFile : segmentTarFiles) { + futures.add(executorService.submit(() -> { + if (System.currentTimeMillis() % 2 == 0) { + return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), + segmentTarFile, headers, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode(); + } else { + return uploadSegmentWithOnlyMetadata(tableName, tableType, uploadSegmentHttpURI, fileUploadDownloadClient, + segmentTarFile); + } + })); + } + executorService.shutdown(); + for (Future<Integer> future : futures) { + assertEquals((int) future.get(), HttpStatus.SC_OK); + } + } + } + } + + private int uploadSegmentWithOnlyMetadata(String tableName, TableType tableType, URI uploadSegmentHttpURI, + FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile) + throws IOException, HttpErrorStatusException { + List<Header> headers = List.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, + String.format("file://%s/%s", segmentTarFile.getParentFile().getAbsolutePath(), + URIUtils.encode(segmentTarFile.getName()))), + new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, + FileUploadDownloadClient.FileUploadType.METADATA.toString()), + new BasicHeader("Authorization", AUTH_TOKEN)); + // Add table name and table type as request parameters + NameValuePair tableNameValuePair = + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName); + NameValuePair tableTypeValuePair = + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, tableType.name()); + List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, tableTypeValuePair); + return fileUploadDownloadClient.uploadSegmentMetadata(uploadSegmentHttpURI, segmentTarFile.getName(), + segmentTarFile, headers, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode(); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java index 58ac3fcc0f..98de16b5dc 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java @@ -21,6 +21,8 @@ package org.apache.pinot.integration.tests; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import java.io.File; +import java.util.Collections; +import java.util.Map; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -164,8 +166,12 @@ public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest { ); } + protected Map<String, String> getHeaders() { + return Collections.emptyMap(); + } + private void runGroupedTimeSeriesQuery(String query, int expectedGroups, TimeSeriesValidator validator) { - JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, QUERY_END_TIME_SEC); + JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, QUERY_END_TIME_SEC, getHeaders()); System.out.println(result); assertEquals(result.get("status").asText(), "success"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org