This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 078c47a63f [timeseries] Adding requester level access control check 
for timeseries queries (#16066)
078c47a63f is described below

commit 078c47a63f62ff7a39248860a9cd803a9fdd503f
Author: Shaurya Chaturvedi <shauryach...@gmail.com>
AuthorDate: Fri Jun 13 13:04:29 2025 -0700

    [timeseries] Adding requester level access control check for timeseries 
queries (#16066)
    
    Co-authored-by: shauryachats <shauryach...@uber.com>
---
 .../broker/api/resources/PinotClientRequest.java   |   8 +-
 .../requesthandler/BrokerRequestHandler.java       |   2 +-
 .../BrokerRequestHandlerDelegate.java              |   5 +-
 .../requesthandler/TimeSeriesRequestHandler.java   |  24 ++-
 .../pinot/integration/tests/ClusterTest.java       |   4 +-
 .../tests/TimeSeriesAuthIntegrationTest.java       | 177 +++++++++++++++++++++
 .../tests/TimeSeriesIntegrationTest.java           |   8 +-
 7 files changed, 218 insertions(+), 10 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 0428d107cc..ad28f32733 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -78,6 +78,7 @@ import 
org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+import org.apache.pinot.spi.auth.broker.RequesterIdentity;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.query.QueryThreadContext;
@@ -292,7 +293,8 @@ public class PinotClientRequest {
     try {
       try (RequestScope requestContext = 
Tracing.getTracer().createRequestScope()) {
         String queryString = requestCtx.getQueryString();
-        PinotBrokerTimeSeriesResponse response = 
executeTimeSeriesQuery(language, queryString, requestContext);
+        PinotBrokerTimeSeriesResponse response = 
executeTimeSeriesQuery(language, queryString, requestContext,
+          makeHttpIdentity(requestCtx));
         if (response.getErrorType() != null && 
!response.getErrorType().isEmpty()) {
           
asyncResponse.resume(Response.serverError().entity(response).build());
           return;
@@ -536,8 +538,8 @@ public class PinotClientRequest {
   }
 
   private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String 
language, String queryString,
-      RequestContext requestContext) {
-    return _requestHandler.handleTimeSeriesRequest(language, queryString, 
requestContext);
+    RequestContext requestContext, RequesterIdentity requesterIdentity) {
+    return _requestHandler.handleTimeSeriesRequest(language, queryString, 
requestContext, requesterIdentity);
   }
 
   public static HttpRequesterIdentity 
makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index 9dfb43ace9..d4d143747a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -65,7 +65,7 @@ public interface BrokerRequestHandler {
    * Run a query and use the time-series engine.
    */
   default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
-      RequestContext requestContext) {
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity) {
     throw new UnsupportedOperationException("Handler does not support Time 
Series requests");
   }
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index c2173b3728..4a7929966e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -125,9 +125,10 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
 
   @Override
   public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
-      RequestContext requestContext) {
+      RequestContext requestContext, RequesterIdentity requesterIdentity) {
     if (_timeSeriesRequestHandler != null) {
-      return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, 
rawQueryParamString, requestContext);
+      return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, 
rawQueryParamString, requestContext,
+        requesterIdentity);
     }
     return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time 
series query engine not enabled.");
   }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 137700b374..7a6ff11147 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -29,7 +29,9 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.http.NameValuePair;
@@ -45,6 +47,7 @@ import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
 import org.apache.pinot.common.utils.HumanReadableDuration;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
+import org.apache.pinot.spi.auth.AuthorizationResult;
 import org.apache.pinot.spi.auth.broker.RequesterIdentity;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.trace.RequestContext;
@@ -101,7 +104,7 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
 
   @Override
   public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
-      RequestContext requestContext) {
+      RequestContext requestContext, RequesterIdentity requesterIdentity) {
     PinotBrokerTimeSeriesResponse timeSeriesResponse = null;
     long queryStartTime = System.currentTimeMillis();
     try {
@@ -109,6 +112,7 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
       requestContext.setBrokerId(_brokerId);
       requestContext.setRequestId(_requestIdGenerator.get());
       RangeTimeSeriesRequest timeSeriesRequest = null;
+      firstStageAccessControlCheck(requesterIdentity);
       try {
         timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString);
       } catch (URISyntaxException e) {
@@ -219,4 +223,22 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
     }
     return HumanReadableDuration.from(step).getSeconds();
   }
+
+  /**
+   * First-stage access control check for the request.
+   * This method checks if the requester has access to the broker to prevent 
unauthenticated requests from
+   * using up resources.
+   * Secondary table-level access control checks will be performed later.
+   *
+   * @param requesterIdentity The identity of the requester.
+   */
+  private void firstStageAccessControlCheck(RequesterIdentity 
requesterIdentity) {
+    AccessControl accessControl = _accessControlFactory.create();
+    AuthorizationResult authorizationResult = 
accessControl.authorize(requesterIdentity);
+    if (!authorizationResult.hasAccess()) {
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
 1);
+      throw new WebApplicationException("Permission denied. " + 
authorizationResult.getFailureMessage(),
+        Response.Status.FORBIDDEN);
+    }
+  }
 }
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index e1fc48bdb0..01442e4048 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -555,12 +555,12 @@ public abstract class ClusterTest extends ControllerTest {
    * Queries the broker's timeseries query endpoint 
(/timeseries/api/v1/query_range).
    * This is used for testing timeseries queries.
    */
-  public JsonNode getTimeseriesQuery(String query, long startTime, long 
endTime) {
+  public JsonNode getTimeseriesQuery(String query, long startTime, long 
endTime, Map<String, String> headers) {
     try {
       Map<String, String> queryParams = Map.of("language", "m3ql", "query", 
query, "start",
         String.valueOf(startTime), "end", String.valueOf(endTime));
       String url = 
buildQueryUrl(getTimeSeriesQueryApiUrl(getBrokerBaseApiUrl()), queryParams);
-      JsonNode responseJsonNode = 
JsonUtils.stringToJsonNode(sendGetRequest(url, Map.of()));
+      JsonNode responseJsonNode = 
JsonUtils.stringToJsonNode(sendGetRequest(url, headers));
       return sanitizeResponse(responseJsonNode);
     } catch (Exception e) {
       throw new RuntimeException("Failed to get timeseries query: " + query, 
e);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesAuthIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesAuthIntegrationTest.java
new file mode 100644
index 0000000000..9c4e885f37
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesAuthIntegrationTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.http.HttpStatus;
+import org.apache.pinot.client.Connection;
+import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_HEADER;
+import static org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_TOKEN;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class TimeSeriesAuthIntegrationTest extends TimeSeriesIntegrationTest {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesAuthIntegrationTest.class);
+
+  @Override
+  protected Map<String, String> getHeaders() {
+    return BasicAuthTestUtils.AUTH_HEADER;
+  }
+
+  @Override
+  public ControllerRequestClient getControllerRequestClient() {
+    if (_controllerRequestClient == null) {
+      _controllerRequestClient =
+        new ControllerRequestClient(_controllerRequestURLBuilder, 
getHttpClient(), AUTH_HEADER);
+    }
+    return _controllerRequestClient;
+  }
+
+  @Override
+  protected Connection getPinotConnection() {
+    if (_pinotConnection == null) {
+      JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+      factory.setHeaders(AUTH_HEADER);
+
+      _pinotConnection =
+        ConnectionFactory.fromZookeeper(getZkUrl() + "/" + 
getHelixClusterName(), factory.buildTransport());
+    }
+    return _pinotConnection;
+  }
+
+  @Override
+  protected void overrideControllerConf(Map<String, Object> properties) {
+    BasicAuthTestUtils.addControllerConfiguration(properties);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    super.overrideBrokerConf(brokerConf);
+    BasicAuthTestUtils.addBrokerConfiguration(brokerConf);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    super.overrideServerConf(serverConf);
+    BasicAuthTestUtils.addServerConfiguration(serverConf);
+  }
+
+  // TODO: Enhance base method to support custom headers.
+  @Override
+  protected void uploadSegments(String tableName, TableType tableType, 
List<File> tarDirs)
+    throws Exception {
+    List<File> segmentTarFiles = new ArrayList<>();
+    for (File tarDir : tarDirs) {
+      File[] tarFiles = tarDir.listFiles();
+      assertNotNull(tarFiles);
+      Collections.addAll(segmentTarFiles, tarFiles);
+    }
+    int numSegments = segmentTarFiles.size();
+    assertTrue(numSegments > 0);
+
+    URI uploadSegmentHttpURI = 
URI.create(getControllerRequestURLBuilder().forSegmentUpload());
+    NameValuePair
+      tableNameValuePair = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName);
+    NameValuePair tableTypeValuePair = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+      tableType.name());
+    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, 
tableTypeValuePair);
+    List<Header> headers = List.of(new BasicHeader("Authorization", 
AUTH_TOKEN));
+
+    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
+      if (numSegments == 1) {
+        File segmentTarFile = segmentTarFiles.get(0);
+        if (System.currentTimeMillis() % 2 == 0) {
+          assertEquals(
+            fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(), segmentTarFile,
+              headers, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode(), HttpStatus.SC_OK);
+        } else {
+          assertEquals(
+            uploadSegmentWithOnlyMetadata(tableName, tableType, 
uploadSegmentHttpURI, fileUploadDownloadClient,
+              segmentTarFile), HttpStatus.SC_OK);
+        }
+      } else {
+        // Upload all segments in parallel
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numSegments);
+        List<Future<Integer>> futures = new ArrayList<>(numSegments);
+        for (File segmentTarFile : segmentTarFiles) {
+          futures.add(executorService.submit(() -> {
+            if (System.currentTimeMillis() % 2 == 0) {
+              return 
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(),
+                segmentTarFile, headers, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+            } else {
+              return uploadSegmentWithOnlyMetadata(tableName, tableType, 
uploadSegmentHttpURI, fileUploadDownloadClient,
+                segmentTarFile);
+            }
+          }));
+        }
+        executorService.shutdown();
+        for (Future<Integer> future : futures) {
+          assertEquals((int) future.get(), HttpStatus.SC_OK);
+        }
+      }
+    }
+  }
+
+  private int uploadSegmentWithOnlyMetadata(String tableName, TableType 
tableType, URI uploadSegmentHttpURI,
+    FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
+    throws IOException, HttpErrorStatusException {
+    List<Header> headers = List.of(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
+        String.format("file://%s/%s", 
segmentTarFile.getParentFile().getAbsolutePath(),
+          URIUtils.encode(segmentTarFile.getName()))),
+      new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+        FileUploadDownloadClient.FileUploadType.METADATA.toString()),
+      new BasicHeader("Authorization", AUTH_TOKEN));
+    // Add table name and table type as request parameters
+    NameValuePair tableNameValuePair =
+      new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName);
+    NameValuePair tableTypeValuePair =
+      new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, 
tableType.name());
+    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, 
tableTypeValuePair);
+    return 
fileUploadDownloadClient.uploadSegmentMetadata(uploadSegmentHttpURI, 
segmentTarFile.getName(),
+      segmentTarFile, headers, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index 58ac3fcc0f..98de16b5dc 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -21,6 +21,8 @@ package org.apache.pinot.integration.tests;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
+import java.util.Collections;
+import java.util.Map;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -164,8 +166,12 @@ public class TimeSeriesIntegrationTest extends 
BaseClusterIntegrationTest {
     );
   }
 
+  protected Map<String, String> getHeaders() {
+    return Collections.emptyMap();
+  }
+
   private void runGroupedTimeSeriesQuery(String query, int expectedGroups, 
TimeSeriesValidator validator) {
-    JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, 
QUERY_END_TIME_SEC);
+    JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, 
QUERY_END_TIME_SEC, getHeaders());
     System.out.println(result);
     assertEquals(result.get("status").asText(), "success");
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to