yashmayya commented on code in PR #14110:
URL: https://github.com/apache/pinot/pull/14110#discussion_r1865823382


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ *
+ */
+@Api(tags = "ResponseStore", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
+    description = "The format of the key is  ```\"Basic <token>\" or \"Bearer 
<token>\"```")))
+@Path("/responseStore")
+public class ResponseStoreResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreResource.class);
+
+  @Inject
+  private PinotConfiguration _brokerConf;
+
+  @Inject
+  private BrokerMetrics _brokerMetrics;
+
+  @Inject
+  private AbstractResponseStore _responseStore;
+
+  @Inject
+  AccessControlFactory _accessControlFactory;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_RESPONSE_STORE)
+  @ApiOperation(value = "Get requestIds of all responses in the result 
store.", notes = "Get requestIds of all "
+      + "query stores in the result store")
+  public Collection<CursorResponse> getResults(@Context HttpHeaders headers) {
+    try {
+      return _responseStore.getAllStoredResponses();
+    } catch (Exception e) {
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}")
+  @ApiOperation(value = "Response without ResultTable for a requestId")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public BrokerResponse getSqlQueryMetadata(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @Context org.glassfish.grizzly.http.server.Request requestContext) {
+    try {
+      if (_responseStore.exists(requestId)) {
+        CursorResponse response = _responseStore.readResponse(requestId);
+        AccessControl accessControl = _accessControlFactory.create();
+        TableAuthorizationResult result = accessControl.authorize(
+            
org.apache.pinot.broker.api.resources.PinotClientRequest.makeHttpIdentity(requestContext),

Review Comment:
   This should probably be moved to some utils class instead.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ *
+ */
+@Api(tags = "ResponseStore", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
+    description = "The format of the key is  ```\"Basic <token>\" or \"Bearer 
<token>\"```")))
+@Path("/responseStore")
+public class ResponseStoreResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreResource.class);
+
+  @Inject
+  private PinotConfiguration _brokerConf;
+
+  @Inject
+  private BrokerMetrics _brokerMetrics;
+
+  @Inject
+  private AbstractResponseStore _responseStore;
+
+  @Inject
+  AccessControlFactory _accessControlFactory;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_RESPONSE_STORE)
+  @ApiOperation(value = "Get requestIds of all responses in the result 
store.", notes = "Get requestIds of all "
+      + "query stores in the result store")
+  public Collection<CursorResponse> getResults(@Context HttpHeaders headers) {
+    try {
+      return _responseStore.getAllStoredResponses();
+    } catch (Exception e) {
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}")
+  @ApiOperation(value = "Response without ResultTable for a requestId")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public BrokerResponse getSqlQueryMetadata(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @Context org.glassfish.grizzly.http.server.Request requestContext) {
+    try {
+      if (_responseStore.exists(requestId)) {
+        CursorResponse response = _responseStore.readResponse(requestId);
+        AccessControl accessControl = _accessControlFactory.create();
+        TableAuthorizationResult result = accessControl.authorize(
+            
org.apache.pinot.broker.api.resources.PinotClientRequest.makeHttpIdentity(requestContext),
+            response.getTablesQueried());
+        if (!result.hasAccess()) {
+          throw new WebApplicationException(
+              
Response.status(Response.Status.FORBIDDEN).entity(result.getFailureMessage()).build());
+        }
+        return _responseStore.readResponse(requestId);
+      } else {
+        throw new 
WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+            .entity(String.format("Query results for %s not found.", 
requestId)).build());
+      }
+    } catch (WebApplicationException wae) {
+      throw wae;
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing GET request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}/results")
+  @ApiOperation(value = "Get result set from a result store for a query")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public void getSqlQueryResult(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @ApiParam(value = "Offset in the result set", required = true) 
@QueryParam("offset") int offset,
+      @ApiParam(value = "Number of rows to fetch") @QueryParam("numRows") 
Integer numRows,
+      @Context org.glassfish.grizzly.http.server.Request requestContext,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      if (_responseStore.exists(requestId)) {
+        CursorResponse response = _responseStore.readResponse(requestId);
+        AccessControl accessControl = _accessControlFactory.create();
+        TableAuthorizationResult result = accessControl.authorize(
+            
org.apache.pinot.broker.api.resources.PinotClientRequest.makeHttpIdentity(requestContext),
+            response.getTablesQueried());
+        if (!result.hasAccess()) {
+          throw new WebApplicationException(
+              
Response.status(Response.Status.FORBIDDEN).entity(result.getFailureMessage()).build());
+        }

Review Comment:
   This part (and the else) is duplicated in `GET /{requestId}` and `GET 
/{requestId}/results` and can probably be extracted into a method?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ *
+ */
+@Api(tags = "ResponseStore", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
+    description = "The format of the key is  ```\"Basic <token>\" or \"Bearer 
<token>\"```")))
+@Path("/responseStore")
+public class ResponseStoreResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreResource.class);
+
+  @Inject
+  private PinotConfiguration _brokerConf;
+
+  @Inject
+  private BrokerMetrics _brokerMetrics;
+
+  @Inject
+  private AbstractResponseStore _responseStore;
+
+  @Inject
+  AccessControlFactory _accessControlFactory;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_RESPONSE_STORE)
+  @ApiOperation(value = "Get requestIds of all responses in the result 
store.", notes = "Get requestIds of all "
+      + "query stores in the result store")
+  public Collection<CursorResponse> getResults(@Context HttpHeaders headers) {
+    try {
+      return _responseStore.getAllStoredResponses();
+    } catch (Exception e) {
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}")
+  @ApiOperation(value = "Response without ResultTable for a requestId")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public BrokerResponse getSqlQueryMetadata(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @Context org.glassfish.grizzly.http.server.Request requestContext) {
+    try {
+      if (_responseStore.exists(requestId)) {
+        CursorResponse response = _responseStore.readResponse(requestId);
+        AccessControl accessControl = _accessControlFactory.create();
+        TableAuthorizationResult result = accessControl.authorize(
+            
org.apache.pinot.broker.api.resources.PinotClientRequest.makeHttpIdentity(requestContext),
+            response.getTablesQueried());
+        if (!result.hasAccess()) {
+          throw new WebApplicationException(
+              
Response.status(Response.Status.FORBIDDEN).entity(result.getFailureMessage()).build());
+        }
+        return _responseStore.readResponse(requestId);
+      } else {
+        throw new 
WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+            .entity(String.format("Query results for %s not found.", 
requestId)).build());
+      }
+    } catch (WebApplicationException wae) {
+      throw wae;
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing GET request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}/results")
+  @ApiOperation(value = "Get result set from a result store for a query")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public void getSqlQueryResult(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @ApiParam(value = "Offset in the result set", required = true) 
@QueryParam("offset") int offset,
+      @ApiParam(value = "Number of rows to fetch") @QueryParam("numRows") 
Integer numRows,
+      @Context org.glassfish.grizzly.http.server.Request requestContext,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      if (_responseStore.exists(requestId)) {
+        CursorResponse response = _responseStore.readResponse(requestId);
+        AccessControl accessControl = _accessControlFactory.create();
+        TableAuthorizationResult result = accessControl.authorize(
+            
org.apache.pinot.broker.api.resources.PinotClientRequest.makeHttpIdentity(requestContext),
+            response.getTablesQueried());
+        if (!result.hasAccess()) {
+          throw new WebApplicationException(
+              
Response.status(Response.Status.FORBIDDEN).entity(result.getFailureMessage()).build());
+        }
+
+        if (numRows == null) {
+          numRows = 
_brokerConf.getProperty(CommonConstants.CursorConfigs.CURSOR_FETCH_ROWS,
+              CommonConstants.CursorConfigs.DEFAULT_CURSOR_FETCH_ROWS);
+        }
+
+        if (numRows > CommonConstants.CursorConfigs.MAX_CURSOR_FETCH_ROWS) {
+          throw new WebApplicationException(
+              "Result Size greater than " + 
CommonConstants.CursorConfigs.MAX_CURSOR_FETCH_ROWS + " not allowed",
+              Response.status(Response.Status.BAD_REQUEST).build());
+        }
+
+        asyncResponse.resume(
+            
PinotClientRequest.getPinotQueryResponse(_responseStore.handleCursorRequest(requestId,
 offset, numRows)));
+      } else {
+        throw new 
WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+            .entity(String.format("Query results for %s not found.", 
requestId)).build());
+      }
+    } catch (WebApplicationException wae) {
+      asyncResponse.resume(wae);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing GET request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);

Review Comment:
   `UNCAUGHT_POST_EXCEPTIONS` -> `UNCAUGHT_POST_EXCEPTIONS`?



##########
pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java:
##########
@@ -97,6 +97,8 @@ public static class Cluster {
     public static final String UPLOAD_SEGMENT = "UploadSegment";
     public static final String GET_INSTANCE_PARTITIONS = 
"GetInstancePartitions";
     public static final String UPDATE_INSTANCE_PARTITIONS = 
"UpdateInstancePartitions";
+    public static final String GET_RESPONSE_STORE = "GetResponseStore";
+    public static final String DELETE_RESPONSE_STORE = "DeleteResponseStore";

Review Comment:
   Aren't these names a little misleading? We're getting / deleting results 
from the response store and not the response store itself right?



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname where ResponseStore is created
+   * @param brokerPort Port where the ResponseStore is created

Review Comment:
   ```suggestion
      * @param brokerHost Hostname of the broker where ResponseStore is created
      * @param brokerPort Port of the broker where the ResponseStore is created
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname where ResponseStore is created
+   * @param brokerPort Port where the ResponseStore is created
+   * @param brokerMetrics Metrics utility to track cursor metrics.
+   * @param responseSerde The Serde object to use to serialize/deserialize the 
responses
+   */
+  public abstract void init(PinotConfiguration config, String brokerHost, int 
brokerPort, BrokerMetrics brokerMetrics,
+      ResponseSerde responseSerde, String expirationTime)
+      throws Exception;
+
+  /**
+   * Get the BrokerMetrics object to update metrics
+   * @return A BrokerMetrics object
+   */
+  protected abstract BrokerMetrics getBrokerMetrics();
+
+  /**
+   * Get the hostname of the broker where the query is executed
+   * @return String containing the hostname
+   */
+  protected abstract String getBrokerHost();
+
+  /**
+   * Get the port of the broker where the query is executed
+   * @return int containing the port
+   */
+  protected abstract int getBrokerPort();
+
+  /**
+   * Get the expiration interval of a query response.
+   * @return long containing the expiration interval.
+   */
+  protected abstract long getExpirationIntervalInMs();
+
+  /**
+   * Write a CursorResponse
+   * @param requestId Request ID of the response
+   * @param response The response to write
+   * @throws Exception Thrown if there is any error while writing the response
+   */
+  protected abstract void writeResponse(String requestId, CursorResponse 
response)
+      throws Exception;
+
+  /**
+   * Write a @link{ResultTable} to the store
+   * @param requestId Request ID of the response
+   * @param resultTable The @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while writing the result 
table.
+   * @return Returns the number of bytes written
+   */
+  protected abstract long writeResultTable(String requestId, ResultTable 
resultTable)
+      throws Exception;
+
+  /**
+   * Read the response (excluding the @link{ResultTable}) from the store
+   * @param requestId Request ID of the response
+   * @return CursorResponse (without the @link{ResultTable})
+   * @throws Exception Thrown if there is any error while reading the response
+   */
+  public abstract CursorResponse readResponse(String requestId)
+      throws Exception;
+
+  /**
+   * Read the @link{ResultTable} of a query response
+   * @param requestId Request ID of the query
+   * @return @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while reading the result 
table
+   */
+  protected abstract ResultTable readResultTable(String requestId)
+      throws Exception;
+
+  protected abstract boolean deleteResponseImpl(String requestId)
+      throws Exception;
+
+  /**
+   * Stores the response in the store. @link{CursorResponse} and 
@link{ResultTable} are stored separately.
+   * @param response Response to be stored
+   * @throws Exception Thrown if there is any error while storing the response.
+   */
+  public void storeResponse(BrokerResponse response)
+      throws Exception {
+    String requestId = response.getRequestId();
+
+    CursorResponse cursorResponse = createCursorResponse(response);
+
+    long submissionTimeMs = System.currentTimeMillis();
+    // Initialize all CursorResponse specific metadata
+    cursorResponse.setBrokerHost(getBrokerHost());
+    cursorResponse.setBrokerPort(getBrokerPort());
+    cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+    cursorResponse.setExpirationTimeMs(submissionTimeMs + 
getExpirationIntervalInMs());
+    cursorResponse.setOffset(0);
+    cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+    try {
+      long bytesWritten = writeResultTable(requestId, 
response.getResultTable());
+
+      // Remove the resultTable from the response as it is serialized in a 
data file.
+      cursorResponse.setResultTable(null);
+      cursorResponse.setBytesWritten(bytesWritten);
+      writeResponse(requestId, cursorResponse);

Review Comment:
   I didn't get this part, why not store the result table and cursor response 
together?



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname where ResponseStore is created
+   * @param brokerPort Port where the ResponseStore is created
+   * @param brokerMetrics Metrics utility to track cursor metrics.
+   * @param responseSerde The Serde object to use to serialize/deserialize the 
responses
+   */
+  public abstract void init(PinotConfiguration config, String brokerHost, int 
brokerPort, BrokerMetrics brokerMetrics,
+      ResponseSerde responseSerde, String expirationTime)
+      throws Exception;
+
+  /**
+   * Get the BrokerMetrics object to update metrics
+   * @return A BrokerMetrics object
+   */
+  protected abstract BrokerMetrics getBrokerMetrics();
+
+  /**
+   * Get the hostname of the broker where the query is executed
+   * @return String containing the hostname
+   */
+  protected abstract String getBrokerHost();
+
+  /**
+   * Get the port of the broker where the query is executed
+   * @return int containing the port
+   */
+  protected abstract int getBrokerPort();
+
+  /**
+   * Get the expiration interval of a query response.
+   * @return long containing the expiration interval.
+   */
+  protected abstract long getExpirationIntervalInMs();
+
+  /**
+   * Write a CursorResponse
+   * @param requestId Request ID of the response
+   * @param response The response to write
+   * @throws Exception Thrown if there is any error while writing the response
+   */
+  protected abstract void writeResponse(String requestId, CursorResponse 
response)
+      throws Exception;
+
+  /**
+   * Write a @link{ResultTable} to the store
+   * @param requestId Request ID of the response
+   * @param resultTable The @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while writing the result 
table.
+   * @return Returns the number of bytes written
+   */
+  protected abstract long writeResultTable(String requestId, ResultTable 
resultTable)
+      throws Exception;
+
+  /**
+   * Read the response (excluding the @link{ResultTable}) from the store
+   * @param requestId Request ID of the response
+   * @return CursorResponse (without the @link{ResultTable})
+   * @throws Exception Thrown if there is any error while reading the response
+   */
+  public abstract CursorResponse readResponse(String requestId)
+      throws Exception;
+
+  /**
+   * Read the @link{ResultTable} of a query response
+   * @param requestId Request ID of the query
+   * @return @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while reading the result 
table
+   */
+  protected abstract ResultTable readResultTable(String requestId)
+      throws Exception;
+
+  protected abstract boolean deleteResponseImpl(String requestId)
+      throws Exception;
+
+  /**
+   * Stores the response in the store. @link{CursorResponse} and 
@link{ResultTable} are stored separately.
+   * @param response Response to be stored
+   * @throws Exception Thrown if there is any error while storing the response.
+   */
+  public void storeResponse(BrokerResponse response)
+      throws Exception {
+    String requestId = response.getRequestId();
+
+    CursorResponse cursorResponse = createCursorResponse(response);
+
+    long submissionTimeMs = System.currentTimeMillis();
+    // Initialize all CursorResponse specific metadata
+    cursorResponse.setBrokerHost(getBrokerHost());
+    cursorResponse.setBrokerPort(getBrokerPort());
+    cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+    cursorResponse.setExpirationTimeMs(submissionTimeMs + 
getExpirationIntervalInMs());
+    cursorResponse.setOffset(0);
+    cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+    try {
+      long bytesWritten = writeResultTable(requestId, 
response.getResultTable());
+
+      // Remove the resultTable from the response as it is serialized in a 
data file.
+      cursorResponse.setResultTable(null);
+      cursorResponse.setBytesWritten(bytesWritten);
+      writeResponse(requestId, cursorResponse);
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_RESULT_STORE_SIZE, 
bytesWritten);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_WRITE_EXCEPTION, 1);
+      deleteResponse(requestId);
+      throw e;
+    }
+  }
+
+  /**
+   * Reads the response from the store and populates it with a slice of the 
@link{ResultTable}
+   * @param requestId Request ID of the query
+   * @param offset Offset of the result slice
+   * @param numRows Number of rows required in the slice
+   * @return A CursorResponse with a slice of the @link{ResultTable}
+   * @throws Exception Thrown if there is any error during the operation.
+   */
+  public CursorResponse handleCursorRequest(String requestId, int offset, int 
numRows)
+      throws Exception {
+
+    CursorResponse response;
+    ResultTable resultTable;
+
+    try {
+      response = readResponse(requestId);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+      throw e;
+    }
+
+    int totalTableRows = response.getNumRowsResultSet();
+
+    if (totalTableRows == 0 && offset == 0) {
+      // If sum records is 0, then result set is empty.
+      response.setResultTable(null);
+      response.setOffset(0);
+      response.setNumRows(0);
+      return response;
+    } else if (offset >= totalTableRows) {
+      throw new RuntimeException("Offset " + offset + " is greater than 
totalRecords " + totalTableRows);
+    }
+
+    long fetchStartTime = System.currentTimeMillis();
+    try {
+      resultTable = readResultTable(requestId);

Review Comment:
   Ideally we'd want to be able to read slices of the result table directly 
from the response store but while that might not be straightforward currently, 
could we at least consider adding an in-memory cache with TTL (Guava has some 
nice implementations IIRC) here? Since most use cases would likely involve 
paging through cursor responses in a short time window, this might be able to 
provide a pretty large performance boost, WDYT?



##########
pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java:
##########
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import org.apache.pinot.common.response.CursorResponse;
+
+
+@JsonPropertyOrder({
+    "resultTable", "numRowsResultSet", "partialResult", "exceptions", 
"numGroupsLimitReached", "timeUsedMs",
+    "requestId", "brokerId", "numDocsScanned", "totalDocs", 
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+    "numServersQueried", "numServersResponded", "numSegmentsQueried", 
"numSegmentsProcessed", "numSegmentsMatched",
+    "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", 
"numConsumingSegmentsMatched",
+    "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", 
"numSegmentsPrunedByServer",
+    "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", 
"numSegmentsPrunedByValue", "brokerReduceTimeMs",
+    "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", 
"offlineSystemActivitiesCpuTimeNs",
+    "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
+    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs",
+    "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tableQueries",
+    // Fields specific to CursorResponse
+    "offset", "numRows", "cursorResultWriteTimeMs", "cursorResultWriteTimeMs", 
"submissionTimeMs", "expirationTimeMs",
+    "brokerHost", "brokerPort", "bytesWritten"
+})
+public class CursorResponseNative extends BrokerResponseNative implements 
CursorResponse {
+  private int _offset;
+  private int _numRows;
+  private long _cursorResultWriteTimeMs;
+  private long _cursorFetchTimeMs;
+  private long _submissionTimeMs;
+  private long _expirationTimeMs;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _bytesWritten;
+
+  public CursorResponseNative() {
+  }
+
+  @Override
+  public String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  public void setBrokerHost(String brokerHost) {
+    _brokerHost = brokerHost;
+  }
+
+  @Override
+  public int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  public void setBrokerPort(int brokerPort) {
+    _brokerPort = brokerPort;
+  }
+
+  @Override
+  public void setOffset(int offset) {
+    _offset = offset;
+  }
+
+  @Override
+  public void setNumRows(int numRows) {
+    _numRows = numRows;
+  }
+
+  @Override
+  public void setCursorFetchTimeMs(long cursorFetchTimeMs) {
+    _cursorFetchTimeMs = cursorFetchTimeMs;
+  }
+
+  public long getSubmissionTimeMs() {
+    return _submissionTimeMs;
+  }
+
+  @Override
+  public void setSubmissionTimeMs(long submissionTimeMs) {
+    _submissionTimeMs = submissionTimeMs;
+  }
+
+  public long getExpirationTimeMs() {
+    return _expirationTimeMs;
+  }
+
+  @Override
+  public void setBytesWritten(long bytesWritten) {
+    _bytesWritten = bytesWritten;
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return _bytesWritten;
+  }
+
+  @Override
+  public void setExpirationTimeMs(long expirationTimeMs) {
+    _expirationTimeMs = expirationTimeMs;
+  }
+
+  @Override
+  public int getOffset() {
+    return _offset;
+  }
+
+  @Override
+  public int getNumRows() {
+    return _numRows;
+  }
+
+  @JsonProperty("cursorResultWriteTimeMs")

Review Comment:
   Why is this only needed here?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/cursors/MemoryResultStore.java:
##########
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.cursors;
+
+import com.google.auto.service.AutoService;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.TimeUtils;
+
+
+@AutoService(ResponseStore.class)
+public class MemoryResultStore extends AbstractResponseStore {
+  private final Map<String, CursorResponse> _cursorResponseMap = new 
HashMap<>();
+  private final Map<String, ResultTable> _resultTableMap = new HashMap<>();
+
+  private static final String TYPE = "memory";
+
+  private BrokerMetrics _brokerMetrics;
+  private ResponseSerde _responseSerde;

Review Comment:
   nit: unused and can be removed.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ *
+ */

Review Comment:
   Intentionally empty? I think a sentence or two for description might be 
useful.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java:
##########
@@ -157,6 +162,10 @@ public void processSqlQueryGet(@ApiParam(value = "Query", 
required = true) @Quer
   })
   @ManualAuthorization
   public void processSqlQueryPost(String query, @Suspended AsyncResponse 
asyncResponse,
+      @ApiParam(value = "Return a cursor instead of complete result set") 
@QueryParam("getCursor")
+      @DefaultValue("false") boolean getCursor,
+      @ApiParam(value = "Number of rows to fetch. Applicable only getCursor is 
true") @QueryParam("numRows")

Review Comment:
   ```suggestion
         @ApiParam(value = "Number of rows to fetch. Applicable only when 
getCursor is true") @QueryParam("numRows")
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ *
+ */
+@Api(tags = "ResponseStore", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
+    description = "The format of the key is  ```\"Basic <token>\" or \"Bearer 
<token>\"```")))
+@Path("/responseStore")
+public class ResponseStoreResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreResource.class);
+
+  @Inject
+  private PinotConfiguration _brokerConf;
+
+  @Inject
+  private BrokerMetrics _brokerMetrics;
+
+  @Inject
+  private AbstractResponseStore _responseStore;
+
+  @Inject
+  AccessControlFactory _accessControlFactory;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_RESPONSE_STORE)
+  @ApiOperation(value = "Get requestIds of all responses in the result 
store.", notes = "Get requestIds of all "
+      + "query stores in the result store")
+  public Collection<CursorResponse> getResults(@Context HttpHeaders headers) {
+    try {
+      return _responseStore.getAllStoredResponses();
+    } catch (Exception e) {
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}")
+  @ApiOperation(value = "Response without ResultTable for a requestId")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public BrokerResponse getSqlQueryMetadata(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @Context org.glassfish.grizzly.http.server.Request requestContext) {
+    try {
+      if (_responseStore.exists(requestId)) {
+        CursorResponse response = _responseStore.readResponse(requestId);
+        AccessControl accessControl = _accessControlFactory.create();
+        TableAuthorizationResult result = accessControl.authorize(
+            
org.apache.pinot.broker.api.resources.PinotClientRequest.makeHttpIdentity(requestContext),
+            response.getTablesQueried());
+        if (!result.hasAccess()) {
+          throw new WebApplicationException(
+              
Response.status(Response.Status.FORBIDDEN).entity(result.getFailureMessage()).build());
+        }
+        return _responseStore.readResponse(requestId);
+      } else {
+        throw new 
WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+            .entity(String.format("Query results for %s not found.", 
requestId)).build());
+      }
+    } catch (WebApplicationException wae) {
+      throw wae;
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing GET request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);

Review Comment:
   `UNCAUGHT_POST_EXCEPTIONS` -> `UNCAUGHT_POST_EXCEPTIONS`?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ *
+ */
+@Api(tags = "ResponseStore", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
+    description = "The format of the key is  ```\"Basic <token>\" or \"Bearer 
<token>\"```")))
+@Path("/responseStore")
+public class ResponseStoreResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreResource.class);
+
+  @Inject
+  private PinotConfiguration _brokerConf;
+
+  @Inject
+  private BrokerMetrics _brokerMetrics;
+
+  @Inject
+  private AbstractResponseStore _responseStore;
+
+  @Inject
+  AccessControlFactory _accessControlFactory;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_RESPONSE_STORE)
+  @ApiOperation(value = "Get requestIds of all responses in the result 
store.", notes = "Get requestIds of all "
+      + "query stores in the result store")

Review Comment:
   Is the `notes` part outdated?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -349,9 +355,31 @@ public void start()
       timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, 
brokerId, _routingManager,
           _accessControlFactory, _queryQuotaManager, tableCache, 
queryDispatcher);
     }
+
+    LOGGER.info("Initializing PinotFSFactory");
+    
PinotFSFactory.init(_brokerConf.subset(CommonConstants.Broker.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY));
+
+    LOGGER.info("Initialize ResultStore");
+    PinotConfiguration resultStoreConfiguration =
+        
_brokerConf.subset(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE);
+    ResponseSerde responseSerde = 
ResponseSerdeService.getInstance().getResponseSerde(
+        
resultStoreConfiguration.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_SERDE,
+            CommonConstants.CursorConfigs.DEFAULT_RESPONSE_SERDE));
+    
responseSerde.init(resultStoreConfiguration.subset(CommonConstants.CursorConfigs.RESPONSE_STORE_SERDE)
+        .subset(responseSerde.getType()));

Review Comment:
   My 2 cents is to not make this configurable / pluggable unless we have a 
concrete plan to do so in the near future. At the very least we can avoid 
exposing the user configuration until we have more than one serde 
implementation with clear pros / cons for the end user to choose from.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ *
+ */
+@Api(tags = "ResponseStore", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
+    description = "The format of the key is  ```\"Basic <token>\" or \"Bearer 
<token>\"```")))
+@Path("/responseStore")
+public class ResponseStoreResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreResource.class);
+
+  @Inject
+  private PinotConfiguration _brokerConf;
+
+  @Inject
+  private BrokerMetrics _brokerMetrics;
+
+  @Inject
+  private AbstractResponseStore _responseStore;
+
+  @Inject
+  AccessControlFactory _accessControlFactory;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_RESPONSE_STORE)
+  @ApiOperation(value = "Get requestIds of all responses in the result 
store.", notes = "Get requestIds of all "
+      + "query stores in the result store")
+  public Collection<CursorResponse> getResults(@Context HttpHeaders headers) {
+    try {
+      return _responseStore.getAllStoredResponses();
+    } catch (Exception e) {
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}")
+  @ApiOperation(value = "Response without ResultTable for a requestId")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public BrokerResponse getSqlQueryMetadata(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @Context org.glassfish.grizzly.http.server.Request requestContext) {
+    try {
+      if (_responseStore.exists(requestId)) {
+        CursorResponse response = _responseStore.readResponse(requestId);
+        AccessControl accessControl = _accessControlFactory.create();
+        TableAuthorizationResult result = accessControl.authorize(
+            
org.apache.pinot.broker.api.resources.PinotClientRequest.makeHttpIdentity(requestContext),
+            response.getTablesQueried());
+        if (!result.hasAccess()) {
+          throw new WebApplicationException(
+              
Response.status(Response.Status.FORBIDDEN).entity(result.getFailureMessage()).build());
+        }
+        return _responseStore.readResponse(requestId);
+      } else {
+        throw new 
WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+            .entity(String.format("Query results for %s not found.", 
requestId)).build());
+      }
+    } catch (WebApplicationException wae) {
+      throw wae;
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing GET request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}/results")
+  @ApiOperation(value = "Get result set from a result store for a query")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public void getSqlQueryResult(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @ApiParam(value = "Offset in the result set", required = true) 
@QueryParam("offset") int offset,
+      @ApiParam(value = "Number of rows to fetch") @QueryParam("numRows") 
Integer numRows,
+      @Context org.glassfish.grizzly.http.server.Request requestContext,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      if (_responseStore.exists(requestId)) {
+        CursorResponse response = _responseStore.readResponse(requestId);
+        AccessControl accessControl = _accessControlFactory.create();
+        TableAuthorizationResult result = accessControl.authorize(
+            
org.apache.pinot.broker.api.resources.PinotClientRequest.makeHttpIdentity(requestContext),
+            response.getTablesQueried());
+        if (!result.hasAccess()) {
+          throw new WebApplicationException(
+              
Response.status(Response.Status.FORBIDDEN).entity(result.getFailureMessage()).build());
+        }
+
+        if (numRows == null) {
+          numRows = 
_brokerConf.getProperty(CommonConstants.CursorConfigs.CURSOR_FETCH_ROWS,
+              CommonConstants.CursorConfigs.DEFAULT_CURSOR_FETCH_ROWS);
+        }
+
+        if (numRows > CommonConstants.CursorConfigs.MAX_CURSOR_FETCH_ROWS) {
+          throw new WebApplicationException(
+              "Result Size greater than " + 
CommonConstants.CursorConfigs.MAX_CURSOR_FETCH_ROWS + " not allowed",
+              Response.status(Response.Status.BAD_REQUEST).build());
+        }

Review Comment:
   Why is this max cursor fetch rows not configurable and how did we arrive at 
the number here?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1301,4 +1308,25 @@ public static class NullValuePlaceHolder {
     public static final byte[][] BYTES_ARRAY = new byte[0][];
     public static final Object MAP = Collections.emptyMap();
   }
+
+  public static class CursorConfigs {
+    public static final String DEFAULT_RESPONSE_STORE_TYPE = "file";
+    public static final String DEFAULT_RESPONSE_SERDE = "json";
+    public static final int MAX_CURSOR_FETCH_ROWS = 100000;
+    public static final int DEFAULT_CURSOR_FETCH_ROWS = 10000;
+    public static final String DEFAULT_RESULTS_EXPIRATION_INTERVAL = "1h"; // 
1 hour.
+    public static final String PREFIX_OF_CONFIG_OF_CURSOR = 
"pinot.broker.cursor";
+    public static final String PREFIX_OF_CONFIG_OF_RESPONSE_STORE = 
"pinot.broker.cursor.response.store";
+    public static final String RESPONSE_STORE_TYPE = "type";
+    public static final String RESPONSE_STORE_SERDE = "serde";
+    public static final String CURSOR_FETCH_ROWS = PREFIX_OF_CONFIG_OF_CURSOR 
+ ".fetch.rows";
+    public static final String RESULTS_EXPIRATION_INTERVAL = 
PREFIX_OF_CONFIG_OF_RESPONSE_STORE + ".expiration";
+
+    public static final String RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD =
+        "controller.cluster.response.store.cleaner.frequencyPeriod";
+    public static final String DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD 
= "1h";
+    public static final String RESPONSE_STORE_CLEANER_INITIAL_DELAY =
+        "controller.cluster.response.store.cleaner.initialDelay";
+    public static final String RESPONSE_STORE_AUTH_PREFIX = 
"controller.cluster.response.store.auth";

Review Comment:
   @Jackie-Jiang is this the recommended mechanism to configure controller -> 
broker auth? Seems strange to have a config for each such separate feature 
rather than having a common config to configure controller -> broker auth.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStore.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.util.Collection;
+
+
+/**
+ * ResponseStore stores responses organized by request id.
+ * Since @link{BrokerResponse} cannot be moved SPI package, some of the 
functions are declared in
+ * @link{AbstractResponseStore}
+ * <br/>
+ * Concurrency Model:
+ * <br/>
+ * There are 3 possible roles - writer, reader and delete.
+ * <br/>
+ * There can only be ONE writer and no other concurrent roles can execute.
+ * A query store is written during query execution. During execution, there 
can be no reads or deletes as the
+ * query id would not have been provided to the client.
+ * <br/>
+ * There can be multiple readers. There maybe concurrent deletes but no 
concurrent writes.
+ * Multiple clients can potentially iterate through the result set.
+ * <br/>
+ * There can be multiple deletes. There maybe concurrent reads but no 
concurrent writes.
+ * Multiple clients can potentially call the delete API.
+ * <br/>
+ * Implementations should ensure that concurrent read/delete and delete/delete 
operations are handled correctly.
+ */
+public interface ResponseStore {
+  /**
+   * Get the type of the ResponseStore
+   * @return Type of the store
+   */
+  String getType();
+
+  /**
+   * Checks if the response for a requestId exists.
+   * @param requestId The ID of the request
+   * @return True if response exists else false
+   * @throws Exception Thrown if an error occurs when checking if the response 
exists.
+   */
+  boolean exists(String requestId)
+    throws Exception;
+
+  /**
+   * Get all request ids of responses in the ResponseStore.
+   *
+   * @return List of request ids
+   */
+  Collection<String> getAllStoredRequestIds()
+      throws Exception;
+
+  /**
+   * Delete a response.
+   *
+   * @param requestId Request id of the query.
+   * @return True if response was found and deleted.

Review Comment:
   Is it false only if the response is not found / request id doesn't exist? 
Let's document that too.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStore.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.util.Collection;
+
+
+/**
+ * ResponseStore stores responses organized by request id.
+ * Since @link{BrokerResponse} cannot be moved SPI package, some of the 
functions are declared in
+ * @link{AbstractResponseStore}

Review Comment:
   These are invalid Javadoc links - both in terms of syntax as well as 
accessibility (neither of the two classes are available here since they're in 
`pinot-common`). We could just use the fully qualified class name instead (or 
even simply the class name is fine).



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname where ResponseStore is created
+   * @param brokerPort Port where the ResponseStore is created
+   * @param brokerMetrics Metrics utility to track cursor metrics.
+   * @param responseSerde The Serde object to use to serialize/deserialize the 
responses
+   */
+  public abstract void init(PinotConfiguration config, String brokerHost, int 
brokerPort, BrokerMetrics brokerMetrics,
+      ResponseSerde responseSerde, String expirationTime)
+      throws Exception;
+
+  /**
+   * Get the BrokerMetrics object to update metrics
+   * @return A BrokerMetrics object
+   */
+  protected abstract BrokerMetrics getBrokerMetrics();
+
+  /**
+   * Get the hostname of the broker where the query is executed
+   * @return String containing the hostname
+   */
+  protected abstract String getBrokerHost();
+
+  /**
+   * Get the port of the broker where the query is executed
+   * @return int containing the port
+   */
+  protected abstract int getBrokerPort();
+
+  /**
+   * Get the expiration interval of a query response.
+   * @return long containing the expiration interval.
+   */
+  protected abstract long getExpirationIntervalInMs();
+
+  /**
+   * Write a CursorResponse
+   * @param requestId Request ID of the response
+   * @param response The response to write
+   * @throws Exception Thrown if there is any error while writing the response
+   */
+  protected abstract void writeResponse(String requestId, CursorResponse 
response)
+      throws Exception;
+
+  /**
+   * Write a @link{ResultTable} to the store
+   * @param requestId Request ID of the response
+   * @param resultTable The @link{ResultTable} of the query

Review Comment:
   ```suggestion
      * Write a {@link ResultTable} to the store
      * @param requestId Request ID of the response
      * @param resultTable The {@link ResultTable} of the query
   ```
   
   These links are currently broken and there's many more such instances here.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -349,9 +355,31 @@ public void start()
       timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, 
brokerId, _routingManager,
           _accessControlFactory, _queryQuotaManager, tableCache, 
queryDispatcher);
     }
+
+    LOGGER.info("Initializing PinotFSFactory");
+    
PinotFSFactory.init(_brokerConf.subset(CommonConstants.Broker.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY));
+
+    LOGGER.info("Initialize ResultStore");
+    PinotConfiguration resultStoreConfiguration =
+        
_brokerConf.subset(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE);
+    ResponseSerde responseSerde = 
ResponseSerdeService.getInstance().getResponseSerde(
+        
resultStoreConfiguration.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_SERDE,
+            CommonConstants.CursorConfigs.DEFAULT_RESPONSE_SERDE));
+    
responseSerde.init(resultStoreConfiguration.subset(CommonConstants.CursorConfigs.RESPONSE_STORE_SERDE)
+        .subset(responseSerde.getType()));
+
+    String expirationTime = 
getConfig().getProperty(CommonConstants.CursorConfigs.RESULTS_EXPIRATION_INTERVAL,

Review Comment:
   nit: why not use `_brokerConf` directly instead of the `getConfig()` 
indirection?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -349,9 +355,31 @@ public void start()
       timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, 
brokerId, _routingManager,
           _accessControlFactory, _queryQuotaManager, tableCache, 
queryDispatcher);
     }
+
+    LOGGER.info("Initializing PinotFSFactory");
+    
PinotFSFactory.init(_brokerConf.subset(CommonConstants.Broker.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY));
+
+    LOGGER.info("Initialize ResultStore");
+    PinotConfiguration resultStoreConfiguration =
+        
_brokerConf.subset(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE);
+    ResponseSerde responseSerde = 
ResponseSerdeService.getInstance().getResponseSerde(
+        
resultStoreConfiguration.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_SERDE,
+            CommonConstants.CursorConfigs.DEFAULT_RESPONSE_SERDE));
+    
responseSerde.init(resultStoreConfiguration.subset(CommonConstants.CursorConfigs.RESPONSE_STORE_SERDE)
+        .subset(responseSerde.getType()));
+
+    String expirationTime = 
getConfig().getProperty(CommonConstants.CursorConfigs.RESULTS_EXPIRATION_INTERVAL,
+        CommonConstants.CursorConfigs.DEFAULT_RESULTS_EXPIRATION_INTERVAL);
+
+    _responseStore = (AbstractResponseStore) 
ResponseStoreService.getInstance().getResultStore(

Review Comment:
   Are we using response store / result store terminology interchangeably? 
Unless there's some particular reason to do so that I'm missing, let's unify 
the terminology used.



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname where ResponseStore is created
+   * @param brokerPort Port where the ResponseStore is created
+   * @param brokerMetrics Metrics utility to track cursor metrics.
+   * @param responseSerde The Serde object to use to serialize/deserialize the 
responses
+   */
+  public abstract void init(PinotConfiguration config, String brokerHost, int 
brokerPort, BrokerMetrics brokerMetrics,
+      ResponseSerde responseSerde, String expirationTime)
+      throws Exception;
+
+  /**
+   * Get the BrokerMetrics object to update metrics
+   * @return A BrokerMetrics object
+   */
+  protected abstract BrokerMetrics getBrokerMetrics();
+
+  /**
+   * Get the hostname of the broker where the query is executed
+   * @return String containing the hostname
+   */
+  protected abstract String getBrokerHost();
+
+  /**
+   * Get the port of the broker where the query is executed
+   * @return int containing the port
+   */
+  protected abstract int getBrokerPort();
+
+  /**
+   * Get the expiration interval of a query response.
+   * @return long containing the expiration interval.
+   */
+  protected abstract long getExpirationIntervalInMs();
+
+  /**
+   * Write a CursorResponse
+   * @param requestId Request ID of the response
+   * @param response The response to write
+   * @throws Exception Thrown if there is any error while writing the response
+   */
+  protected abstract void writeResponse(String requestId, CursorResponse 
response)
+      throws Exception;
+
+  /**
+   * Write a @link{ResultTable} to the store
+   * @param requestId Request ID of the response
+   * @param resultTable The @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while writing the result 
table.
+   * @return Returns the number of bytes written
+   */
+  protected abstract long writeResultTable(String requestId, ResultTable 
resultTable)

Review Comment:
   Why does the response and result table need to be written separately?



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;

Review Comment:
   ```suggestion
      * @param config Subset configuration of 
pinot.broker.cursor.response.store.&lt;type&gt;
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname where ResponseStore is created
+   * @param brokerPort Port where the ResponseStore is created
+   * @param brokerMetrics Metrics utility to track cursor metrics.
+   * @param responseSerde The Serde object to use to serialize/deserialize the 
responses
+   */
+  public abstract void init(PinotConfiguration config, String brokerHost, int 
brokerPort, BrokerMetrics brokerMetrics,
+      ResponseSerde responseSerde, String expirationTime)
+      throws Exception;
+
+  /**
+   * Get the BrokerMetrics object to update metrics
+   * @return A BrokerMetrics object
+   */
+  protected abstract BrokerMetrics getBrokerMetrics();
+
+  /**
+   * Get the hostname of the broker where the query is executed
+   * @return String containing the hostname
+   */
+  protected abstract String getBrokerHost();
+
+  /**
+   * Get the port of the broker where the query is executed
+   * @return int containing the port
+   */
+  protected abstract int getBrokerPort();
+
+  /**
+   * Get the expiration interval of a query response.
+   * @return long containing the expiration interval.
+   */
+  protected abstract long getExpirationIntervalInMs();
+
+  /**
+   * Write a CursorResponse
+   * @param requestId Request ID of the response
+   * @param response The response to write
+   * @throws Exception Thrown if there is any error while writing the response
+   */
+  protected abstract void writeResponse(String requestId, CursorResponse 
response)
+      throws Exception;
+
+  /**
+   * Write a @link{ResultTable} to the store
+   * @param requestId Request ID of the response
+   * @param resultTable The @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while writing the result 
table.
+   * @return Returns the number of bytes written
+   */
+  protected abstract long writeResultTable(String requestId, ResultTable 
resultTable)
+      throws Exception;
+
+  /**
+   * Read the response (excluding the @link{ResultTable}) from the store
+   * @param requestId Request ID of the response
+   * @return CursorResponse (without the @link{ResultTable})
+   * @throws Exception Thrown if there is any error while reading the response
+   */
+  public abstract CursorResponse readResponse(String requestId)
+      throws Exception;
+
+  /**
+   * Read the @link{ResultTable} of a query response
+   * @param requestId Request ID of the query
+   * @return @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while reading the result 
table
+   */
+  protected abstract ResultTable readResultTable(String requestId)
+      throws Exception;
+
+  protected abstract boolean deleteResponseImpl(String requestId)
+      throws Exception;
+
+  /**
+   * Stores the response in the store. @link{CursorResponse} and 
@link{ResultTable} are stored separately.
+   * @param response Response to be stored
+   * @throws Exception Thrown if there is any error while storing the response.
+   */
+  public void storeResponse(BrokerResponse response)
+      throws Exception {
+    String requestId = response.getRequestId();
+
+    CursorResponse cursorResponse = createCursorResponse(response);
+
+    long submissionTimeMs = System.currentTimeMillis();
+    // Initialize all CursorResponse specific metadata
+    cursorResponse.setBrokerHost(getBrokerHost());
+    cursorResponse.setBrokerPort(getBrokerPort());
+    cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+    cursorResponse.setExpirationTimeMs(submissionTimeMs + 
getExpirationIntervalInMs());
+    cursorResponse.setOffset(0);
+    cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+    try {
+      long bytesWritten = writeResultTable(requestId, 
response.getResultTable());
+
+      // Remove the resultTable from the response as it is serialized in a 
data file.
+      cursorResponse.setResultTable(null);
+      cursorResponse.setBytesWritten(bytesWritten);
+      writeResponse(requestId, cursorResponse);
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_RESULT_STORE_SIZE, 
bytesWritten);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_WRITE_EXCEPTION, 1);
+      deleteResponse(requestId);
+      throw e;
+    }
+  }
+
+  /**
+   * Reads the response from the store and populates it with a slice of the 
@link{ResultTable}
+   * @param requestId Request ID of the query
+   * @param offset Offset of the result slice
+   * @param numRows Number of rows required in the slice
+   * @return A CursorResponse with a slice of the @link{ResultTable}
+   * @throws Exception Thrown if there is any error during the operation.
+   */
+  public CursorResponse handleCursorRequest(String requestId, int offset, int 
numRows)
+      throws Exception {
+
+    CursorResponse response;
+    ResultTable resultTable;
+
+    try {
+      response = readResponse(requestId);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+      throw e;
+    }
+
+    int totalTableRows = response.getNumRowsResultSet();
+
+    if (totalTableRows == 0 && offset == 0) {
+      // If sum records is 0, then result set is empty.
+      response.setResultTable(null);
+      response.setOffset(0);
+      response.setNumRows(0);
+      return response;
+    } else if (offset >= totalTableRows) {
+      throw new RuntimeException("Offset " + offset + " is greater than 
totalRecords " + totalTableRows);

Review Comment:
   Maybe we can simply say offset is out of range alongside the `totalRecords`? 
Currently, we can get a confusing message like `Offset 500 is greater than 
totalRecords 500`.



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname where ResponseStore is created
+   * @param brokerPort Port where the ResponseStore is created
+   * @param brokerMetrics Metrics utility to track cursor metrics.
+   * @param responseSerde The Serde object to use to serialize/deserialize the 
responses
+   */
+  public abstract void init(PinotConfiguration config, String brokerHost, int 
brokerPort, BrokerMetrics brokerMetrics,
+      ResponseSerde responseSerde, String expirationTime)
+      throws Exception;
+
+  /**
+   * Get the BrokerMetrics object to update metrics
+   * @return A BrokerMetrics object
+   */
+  protected abstract BrokerMetrics getBrokerMetrics();
+
+  /**
+   * Get the hostname of the broker where the query is executed
+   * @return String containing the hostname
+   */
+  protected abstract String getBrokerHost();
+
+  /**
+   * Get the port of the broker where the query is executed
+   * @return int containing the port
+   */
+  protected abstract int getBrokerPort();
+
+  /**
+   * Get the expiration interval of a query response.
+   * @return long containing the expiration interval.
+   */
+  protected abstract long getExpirationIntervalInMs();
+
+  /**
+   * Write a CursorResponse
+   * @param requestId Request ID of the response
+   * @param response The response to write
+   * @throws Exception Thrown if there is any error while writing the response
+   */
+  protected abstract void writeResponse(String requestId, CursorResponse 
response)
+      throws Exception;
+
+  /**
+   * Write a @link{ResultTable} to the store
+   * @param requestId Request ID of the response
+   * @param resultTable The @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while writing the result 
table.
+   * @return Returns the number of bytes written
+   */
+  protected abstract long writeResultTable(String requestId, ResultTable 
resultTable)
+      throws Exception;
+
+  /**
+   * Read the response (excluding the @link{ResultTable}) from the store
+   * @param requestId Request ID of the response
+   * @return CursorResponse (without the @link{ResultTable})
+   * @throws Exception Thrown if there is any error while reading the response
+   */
+  public abstract CursorResponse readResponse(String requestId)
+      throws Exception;
+
+  /**
+   * Read the @link{ResultTable} of a query response
+   * @param requestId Request ID of the query
+   * @return @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while reading the result 
table
+   */
+  protected abstract ResultTable readResultTable(String requestId)
+      throws Exception;
+
+  protected abstract boolean deleteResponseImpl(String requestId)
+      throws Exception;
+
+  /**
+   * Stores the response in the store. @link{CursorResponse} and 
@link{ResultTable} are stored separately.
+   * @param response Response to be stored
+   * @throws Exception Thrown if there is any error while storing the response.
+   */
+  public void storeResponse(BrokerResponse response)
+      throws Exception {
+    String requestId = response.getRequestId();
+
+    CursorResponse cursorResponse = createCursorResponse(response);
+
+    long submissionTimeMs = System.currentTimeMillis();
+    // Initialize all CursorResponse specific metadata
+    cursorResponse.setBrokerHost(getBrokerHost());
+    cursorResponse.setBrokerPort(getBrokerPort());
+    cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+    cursorResponse.setExpirationTimeMs(submissionTimeMs + 
getExpirationIntervalInMs());
+    cursorResponse.setOffset(0);
+    cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+    try {
+      long bytesWritten = writeResultTable(requestId, 
response.getResultTable());
+
+      // Remove the resultTable from the response as it is serialized in a 
data file.
+      cursorResponse.setResultTable(null);
+      cursorResponse.setBytesWritten(bytesWritten);
+      writeResponse(requestId, cursorResponse);
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_RESULT_STORE_SIZE, 
bytesWritten);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_WRITE_EXCEPTION, 1);
+      deleteResponse(requestId);
+      throw e;
+    }
+  }
+
+  /**
+   * Reads the response from the store and populates it with a slice of the 
@link{ResultTable}
+   * @param requestId Request ID of the query
+   * @param offset Offset of the result slice
+   * @param numRows Number of rows required in the slice
+   * @return A CursorResponse with a slice of the @link{ResultTable}
+   * @throws Exception Thrown if there is any error during the operation.
+   */
+  public CursorResponse handleCursorRequest(String requestId, int offset, int 
numRows)
+      throws Exception {
+
+    CursorResponse response;
+    ResultTable resultTable;
+
+    try {
+      response = readResponse(requestId);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+      throw e;
+    }
+
+    int totalTableRows = response.getNumRowsResultSet();
+
+    if (totalTableRows == 0 && offset == 0) {
+      // If sum records is 0, then result set is empty.
+      response.setResultTable(null);
+      response.setOffset(0);
+      response.setNumRows(0);
+      return response;
+    } else if (offset >= totalTableRows) {
+      throw new RuntimeException("Offset " + offset + " is greater than 
totalRecords " + totalTableRows);
+    }
+
+    long fetchStartTime = System.currentTimeMillis();
+    try {
+      resultTable = readResultTable(requestId);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+      throw e;
+    }
+
+    int sliceEnd = offset + numRows;
+    if (sliceEnd > totalTableRows) {
+      sliceEnd = totalTableRows;
+      numRows = sliceEnd - offset;
+    }
+
+    response.setResultTable(
+        new ResultTable(resultTable.getDataSchema(), 
resultTable.getRows().subList(offset, sliceEnd)));
+    response.setCursorFetchTimeMs(System.currentTimeMillis() - fetchStartTime);
+    response.setOffset(offset);
+    response.setNumRows(numRows);
+    response.setNumRowsResultSet(totalTableRows);
+    return response;
+  }
+
+  public List<CursorResponse> getAllStoredResponses()
+      throws Exception {
+    List<CursorResponse> responses = new ArrayList<>();
+
+    for (String requestId : getAllStoredRequestIds()) {
+      responses.add(readResponse(requestId));
+    }
+
+    return responses;
+  }
+
+  @Override
+  public boolean deleteResponse(String requestId) throws Exception {
+    if (!exists(requestId)) {
+      return false;
+    }
+
+    long bytesWritten = readResponse(requestId).getBytesWritten();
+    
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_RESULT_STORE_SIZE, 
bytesWritten * -1);
+    return deleteResponseImpl(requestId);
+  }
+
+  public static CursorResponseNative createCursorResponse(BrokerResponse 
response) {
+    CursorResponseNative responseNative = new CursorResponseNative();
+
+    // Copy all the member variables of BrokerResponse to CursorResponse.

Review Comment:
   Missing `tablesQueried`?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseSerde.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+/**
+ * ResponseSerde is used to serialize and deserialize responses for the Cursor 
Response Store.
+ */
+public interface ResponseSerde {
+  /**
+   * Get the type of response. The type is used to identify the serde 
implementation. Type has to be unique.
+   * @return Type of the serde.
+   */
+  String getType();
+
+  /**
+   * Initialize the Serde from the configuration. The function is called with 
subset config of
+   * "pinot.broker.cursor.response.store.&lt;serde&gt;
+   * @param pinotConfiguration Subset configuration of the Serde
+   */
+  void init(PinotConfiguration pinotConfiguration);
+
+  /**
+   * Serialize an object to the output stream
+   * @param object Object to be serialized
+   * @param stream OutputStream to write to.
+   * @throws IOException Throws an exception when there is an issue writing to 
the output stream.
+   */
+  void serialize(Object object, OutputStream stream)
+      throws IOException;
+
+  /**
+   * Deserialize an input stream to an object of a specific class.
+   * @param stream Input stream to read from.
+   * @param valueType Class of the object
+   * @return An object of type T class
+   * @param <T> Class Type
+   * @throws IOException Thrown if there is an issue to read from the input 
stream.
+   */
+  <T> T deserialize(InputStream stream, Class<T> valueType)

Review Comment:
   This seems too broad for implementations to be able to feasibly implement. 
Should we separate this out into separate methods for the cursor response and 
result table since those two seem to be the only possible use cases here?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java:
##########
@@ -190,6 +190,14 @@ public static boolean isUseMultistageEngine(Map<String, 
String> queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.USE_MULTISTAGE_ENGINE));
   }
 
+  public static boolean isGetCursor(Map<String, String> queryOptions) {
+    return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.GET_CURSOR));
+  }
+
+  public static int getCursorNumRows(Map<String, String> queryOptions) {
+    return Integer.parseInt(queryOptions.get(QueryOptionKey.CURSOR_NUM_ROWS));
+  }

Review Comment:
   This assumes that the query options passed in will always contain the num 
rows key (else a number format exception will be thrown). Currently, this 
should always be the case since you're making sure to set the option in 
`PinotClientRequest`, but it'd be good to document this expectation.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseSerde.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+/**
+ * ResponseSerde is used to serialize and deserialize responses for the Cursor 
Response Store.
+ */
+public interface ResponseSerde {
+  /**
+   * Get the type of response. The type is used to identify the serde 
implementation. Type has to be unique.
+   * @return Type of the serde.
+   */
+  String getType();
+
+  /**
+   * Initialize the Serde from the configuration. The function is called with 
subset config of
+   * "pinot.broker.cursor.response.store.&lt;serde&gt;

Review Comment:
   ```suggestion
      * pinot.broker.cursor.response.store.&lt;serde&gt;
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname where ResponseStore is created
+   * @param brokerPort Port where the ResponseStore is created
+   * @param brokerMetrics Metrics utility to track cursor metrics.
+   * @param responseSerde The Serde object to use to serialize/deserialize the 
responses
+   */
+  public abstract void init(PinotConfiguration config, String brokerHost, int 
brokerPort, BrokerMetrics brokerMetrics,
+      ResponseSerde responseSerde, String expirationTime)
+      throws Exception;
+
+  /**
+   * Get the BrokerMetrics object to update metrics
+   * @return A BrokerMetrics object
+   */
+  protected abstract BrokerMetrics getBrokerMetrics();
+
+  /**
+   * Get the hostname of the broker where the query is executed
+   * @return String containing the hostname
+   */
+  protected abstract String getBrokerHost();
+
+  /**
+   * Get the port of the broker where the query is executed
+   * @return int containing the port
+   */
+  protected abstract int getBrokerPort();
+
+  /**
+   * Get the expiration interval of a query response.
+   * @return long containing the expiration interval.
+   */
+  protected abstract long getExpirationIntervalInMs();
+
+  /**
+   * Write a CursorResponse
+   * @param requestId Request ID of the response
+   * @param response The response to write
+   * @throws Exception Thrown if there is any error while writing the response
+   */
+  protected abstract void writeResponse(String requestId, CursorResponse 
response)
+      throws Exception;
+
+  /**
+   * Write a @link{ResultTable} to the store
+   * @param requestId Request ID of the response
+   * @param resultTable The @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while writing the result 
table.
+   * @return Returns the number of bytes written
+   */
+  protected abstract long writeResultTable(String requestId, ResultTable 
resultTable)
+      throws Exception;
+
+  /**
+   * Read the response (excluding the @link{ResultTable}) from the store
+   * @param requestId Request ID of the response
+   * @return CursorResponse (without the @link{ResultTable})
+   * @throws Exception Thrown if there is any error while reading the response
+   */
+  public abstract CursorResponse readResponse(String requestId)
+      throws Exception;
+
+  /**
+   * Read the @link{ResultTable} of a query response
+   * @param requestId Request ID of the query
+   * @return @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while reading the result 
table
+   */
+  protected abstract ResultTable readResultTable(String requestId)
+      throws Exception;
+
+  protected abstract boolean deleteResponseImpl(String requestId)
+      throws Exception;
+
+  /**
+   * Stores the response in the store. @link{CursorResponse} and 
@link{ResultTable} are stored separately.
+   * @param response Response to be stored
+   * @throws Exception Thrown if there is any error while storing the response.
+   */
+  public void storeResponse(BrokerResponse response)
+      throws Exception {
+    String requestId = response.getRequestId();
+
+    CursorResponse cursorResponse = createCursorResponse(response);
+
+    long submissionTimeMs = System.currentTimeMillis();
+    // Initialize all CursorResponse specific metadata
+    cursorResponse.setBrokerHost(getBrokerHost());
+    cursorResponse.setBrokerPort(getBrokerPort());
+    cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+    cursorResponse.setExpirationTimeMs(submissionTimeMs + 
getExpirationIntervalInMs());
+    cursorResponse.setOffset(0);
+    cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+    try {
+      long bytesWritten = writeResultTable(requestId, 
response.getResultTable());
+
+      // Remove the resultTable from the response as it is serialized in a 
data file.
+      cursorResponse.setResultTable(null);
+      cursorResponse.setBytesWritten(bytesWritten);
+      writeResponse(requestId, cursorResponse);
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_RESULT_STORE_SIZE, 
bytesWritten);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_WRITE_EXCEPTION, 1);
+      deleteResponse(requestId);
+      throw e;
+    }
+  }
+
+  /**
+   * Reads the response from the store and populates it with a slice of the 
@link{ResultTable}
+   * @param requestId Request ID of the query
+   * @param offset Offset of the result slice
+   * @param numRows Number of rows required in the slice
+   * @return A CursorResponse with a slice of the @link{ResultTable}
+   * @throws Exception Thrown if there is any error during the operation.
+   */
+  public CursorResponse handleCursorRequest(String requestId, int offset, int 
numRows)
+      throws Exception {
+
+    CursorResponse response;
+    ResultTable resultTable;
+
+    try {
+      response = readResponse(requestId);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+      throw e;
+    }
+
+    int totalTableRows = response.getNumRowsResultSet();
+
+    if (totalTableRows == 0 && offset == 0) {
+      // If sum records is 0, then result set is empty.
+      response.setResultTable(null);
+      response.setOffset(0);
+      response.setNumRows(0);
+      return response;
+    } else if (offset >= totalTableRows) {
+      throw new RuntimeException("Offset " + offset + " is greater than 
totalRecords " + totalTableRows);
+    }
+
+    long fetchStartTime = System.currentTimeMillis();
+    try {
+      resultTable = readResultTable(requestId);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+      throw e;
+    }
+
+    int sliceEnd = offset + numRows;
+    if (sliceEnd > totalTableRows) {
+      sliceEnd = totalTableRows;
+      numRows = sliceEnd - offset;
+    }
+
+    response.setResultTable(
+        new ResultTable(resultTable.getDataSchema(), 
resultTable.getRows().subList(offset, sliceEnd)));
+    response.setCursorFetchTimeMs(System.currentTimeMillis() - fetchStartTime);
+    response.setOffset(offset);
+    response.setNumRows(numRows);
+    response.setNumRowsResultSet(totalTableRows);
+    return response;
+  }
+
+  public List<CursorResponse> getAllStoredResponses()
+      throws Exception {
+    List<CursorResponse> responses = new ArrayList<>();
+
+    for (String requestId : getAllStoredRequestIds()) {
+      responses.add(readResponse(requestId));
+    }
+
+    return responses;
+  }
+
+  @Override
+  public boolean deleteResponse(String requestId) throws Exception {
+    if (!exists(requestId)) {
+      return false;
+    }
+
+    long bytesWritten = readResponse(requestId).getBytesWritten();
+    
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_RESULT_STORE_SIZE, 
bytesWritten * -1);
+    return deleteResponseImpl(requestId);
+  }
+
+  public static CursorResponseNative createCursorResponse(BrokerResponse 
response) {
+    CursorResponseNative responseNative = new CursorResponseNative();
+
+    // Copy all the member variables of BrokerResponse to CursorResponse.
+    responseNative.setResultTable(response.getResultTable());
+    responseNative.setNumRowsResultSet(response.getNumRowsResultSet());
+    responseNative.setExceptions(response.getExceptions());
+    
responseNative.setNumGroupsLimitReached(response.isNumGroupsLimitReached());
+    responseNative.setTimeUsedMs(response.getTimeUsedMs());
+    responseNative.setRequestId(response.getRequestId());
+    responseNative.setBrokerId(response.getBrokerId());
+    responseNative.setNumDocsScanned(response.getNumDocsScanned());
+    responseNative.setTotalDocs(response.getTotalDocs());
+    
responseNative.setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter());
+    
responseNative.setNumEntriesScannedPostFilter(response.getNumEntriesScannedPostFilter());
+    responseNative.setNumServersQueried(response.getNumServersQueried());
+    responseNative.setNumServersResponded(response.getNumServersResponded());
+    responseNative.setNumSegmentsQueried(response.getNumSegmentsQueried());
+    responseNative.setNumSegmentsProcessed(response.getNumSegmentsProcessed());
+    responseNative.setNumSegmentsMatched(response.getNumSegmentsMatched());
+    
responseNative.setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried());
+    
responseNative.setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed());
+    
responseNative.setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched());
+    
responseNative.setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs());
+    
responseNative.setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker());
+    
responseNative.setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer());
+    
responseNative.setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid());
+    
responseNative.setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit());
+    
responseNative.setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
+    responseNative.setBrokerReduceTimeMs(response.getBrokerReduceTimeMs());
+    
responseNative.setOfflineThreadCpuTimeNs(response.getOfflineThreadCpuTimeNs());
+    
responseNative.setRealtimeThreadCpuTimeNs(response.getRealtimeThreadCpuTimeNs());
+    
responseNative.setOfflineSystemActivitiesCpuTimeNs(response.getOfflineSystemActivitiesCpuTimeNs());
+    
responseNative.setRealtimeSystemActivitiesCpuTimeNs(response.getRealtimeSystemActivitiesCpuTimeNs());
+    
responseNative.setOfflineResponseSerializationCpuTimeNs(response.getOfflineResponseSerializationCpuTimeNs());
+    
responseNative.setRealtimeResponseSerializationCpuTimeNs(response.getRealtimeResponseSerializationCpuTimeNs());
+    
responseNative.setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
+    
responseNative.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
+    responseNative.setTraceInfo(response.getTraceInfo());

Review Comment:
   Wouldn't it be cleaner to have a copy constructor in the 
`CursorResponseNative` class instead?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ *
+ */
+@Api(tags = "ResponseStore", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
+    description = "The format of the key is  ```\"Basic <token>\" or \"Bearer 
<token>\"```")))
+@Path("/responseStore")
+public class ResponseStoreResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreResource.class);
+
+  @Inject
+  private PinotConfiguration _brokerConf;
+
+  @Inject
+  private BrokerMetrics _brokerMetrics;
+
+  @Inject
+  private AbstractResponseStore _responseStore;
+
+  @Inject
+  AccessControlFactory _accessControlFactory;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_RESPONSE_STORE)
+  @ApiOperation(value = "Get requestIds of all responses in the result 
store.", notes = "Get requestIds of all "

Review Comment:
   > Get requestIds of all responses in the result store
   
   It's not just the request IDs right? Seems like the entire cursor response 
metadata for each response is being returned here?



##########
pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response;
+
+public interface CursorResponse extends BrokerResponse {
+
+  void setBrokerHost(String brokerHost);
+
+    /**
+     * get hostname of the processing broker
+     * @return String containing the hostname
+     */
+  String getBrokerHost();
+
+  void setBrokerPort(int brokerPort);
+
+    /**
+     * get port of the processing broker
+     * @return int containing the port.
+     */
+  int getBrokerPort();
+
+  /**
+   * Set the starting offset of result table slice
+   * @param offset Offset of the result table slice
+   */
+  void setOffset(int offset);
+
+  /**
+   * Current offset in the query result.
+   * Starts from 0.
+   * @return current offset.
+   */
+  int getOffset();
+
+  /**
+   * Set the number of rows in the result table slice.
+   * @param numRows Number of rows in the result table slice
+   */
+  void setNumRows(int numRows);
+
+  /**
+   * Number of rows in the current response.
+   * @return Number of rows in the current response.
+   */
+  int getNumRows();
+
+  /**
+   * Return the time to write the results to the query store.
+   * @return time in milliseconds
+   */
+  long getCursorResultWriteTimeMs();
+
+  /**
+   * Time taken to write cursor results to query storage.
+   * @param cursorResultWriteMs Time in milliseconds.
+   */
+  void setCursorResultWriteTimeMs(long cursorResultWriteMs);
+
+  /**
+   * Return the time to fetch results from the query store.
+   * @return time in milliseconds.
+   */
+  long getCursorFetchTimeMs();
+
+  /**
+   * Set the time taken to fetch a cursor. The time is specific to the current 
call.
+   * @param cursorFetchTimeMs time in milliseconds
+   */
+  void setCursorFetchTimeMs(long cursorFetchTimeMs);
+
+  /**
+   * Unix timestamp when the query was submitted. The timestamp is used to 
calculate the expiration time when the
+   * response will be deleted from the response store.
+   * @param submissionTimeMs Unix timestamp when the query was submitted.
+   */
+  void setSubmissionTimeMs(long submissionTimeMs);
+
+  /**
+   * Get the unix timestamp when the query was submitted
+   * @return Submission unix timestamp when the query was submitted
+   */
+  long getSubmissionTimeMs();
+
+  /**
+   * Set the expiration time (unix timestamp) when the response will be 
deleted from the response store.
+   * @param expirationTimeMs unix timestamp when the response expires in the 
response store
+   */
+  void setExpirationTimeMs(long expirationTimeMs);
+
+  /**
+   * Get the expiration time (unix timestamp) when the response will be 
deleted from the response store.
+   * @return  expirationTimeMs unix timestamp when the response expires in the 
response store
+   */  long getExpirationTimeMs();

Review Comment:
   nit: formatting is off (you can use the checkstyle configs 
[here](https://github.com/apache/pinot/tree/master/config) to allow your IDE to 
auto-format as per Pinot style).



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1301,4 +1308,25 @@ public static class NullValuePlaceHolder {
     public static final byte[][] BYTES_ARRAY = new byte[0][];
     public static final Object MAP = Collections.emptyMap();
   }
+
+  public static class CursorConfigs {
+    public static final String DEFAULT_RESPONSE_STORE_TYPE = "file";
+    public static final String DEFAULT_RESPONSE_SERDE = "json";
+    public static final int MAX_CURSOR_FETCH_ROWS = 100000;
+    public static final int DEFAULT_CURSOR_FETCH_ROWS = 10000;
+    public static final String DEFAULT_RESULTS_EXPIRATION_INTERVAL = "1h"; // 
1 hour.

Review Comment:
   Let's keep related configurations together so it's easier to read - this 
should be before / after `RESULTS_EXPIRATION_INTERVAL`?



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
"pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname where ResponseStore is created
+   * @param brokerPort Port where the ResponseStore is created
+   * @param brokerMetrics Metrics utility to track cursor metrics.
+   * @param responseSerde The Serde object to use to serialize/deserialize the 
responses
+   */
+  public abstract void init(PinotConfiguration config, String brokerHost, int 
brokerPort, BrokerMetrics brokerMetrics,
+      ResponseSerde responseSerde, String expirationTime)
+      throws Exception;
+
+  /**
+   * Get the BrokerMetrics object to update metrics
+   * @return A BrokerMetrics object
+   */
+  protected abstract BrokerMetrics getBrokerMetrics();
+
+  /**
+   * Get the hostname of the broker where the query is executed
+   * @return String containing the hostname
+   */
+  protected abstract String getBrokerHost();
+
+  /**
+   * Get the port of the broker where the query is executed
+   * @return int containing the port
+   */
+  protected abstract int getBrokerPort();
+
+  /**
+   * Get the expiration interval of a query response.
+   * @return long containing the expiration interval.
+   */
+  protected abstract long getExpirationIntervalInMs();
+
+  /**
+   * Write a CursorResponse
+   * @param requestId Request ID of the response
+   * @param response The response to write
+   * @throws Exception Thrown if there is any error while writing the response
+   */
+  protected abstract void writeResponse(String requestId, CursorResponse 
response)
+      throws Exception;
+
+  /**
+   * Write a @link{ResultTable} to the store
+   * @param requestId Request ID of the response
+   * @param resultTable The @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while writing the result 
table.
+   * @return Returns the number of bytes written
+   */
+  protected abstract long writeResultTable(String requestId, ResultTable 
resultTable)
+      throws Exception;
+
+  /**
+   * Read the response (excluding the @link{ResultTable}) from the store
+   * @param requestId Request ID of the response
+   * @return CursorResponse (without the @link{ResultTable})
+   * @throws Exception Thrown if there is any error while reading the response
+   */
+  public abstract CursorResponse readResponse(String requestId)
+      throws Exception;
+
+  /**
+   * Read the @link{ResultTable} of a query response
+   * @param requestId Request ID of the query
+   * @return @link{ResultTable} of the query
+   * @throws Exception Thrown if there is any error while reading the result 
table
+   */
+  protected abstract ResultTable readResultTable(String requestId)
+      throws Exception;
+
+  protected abstract boolean deleteResponseImpl(String requestId)
+      throws Exception;
+
+  /**
+   * Stores the response in the store. @link{CursorResponse} and 
@link{ResultTable} are stored separately.
+   * @param response Response to be stored
+   * @throws Exception Thrown if there is any error while storing the response.
+   */
+  public void storeResponse(BrokerResponse response)
+      throws Exception {
+    String requestId = response.getRequestId();
+
+    CursorResponse cursorResponse = createCursorResponse(response);
+
+    long submissionTimeMs = System.currentTimeMillis();
+    // Initialize all CursorResponse specific metadata
+    cursorResponse.setBrokerHost(getBrokerHost());
+    cursorResponse.setBrokerPort(getBrokerPort());
+    cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+    cursorResponse.setExpirationTimeMs(submissionTimeMs + 
getExpirationIntervalInMs());
+    cursorResponse.setOffset(0);
+    cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+    try {
+      long bytesWritten = writeResultTable(requestId, 
response.getResultTable());
+
+      // Remove the resultTable from the response as it is serialized in a 
data file.
+      cursorResponse.setResultTable(null);
+      cursorResponse.setBytesWritten(bytesWritten);
+      writeResponse(requestId, cursorResponse);
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_RESULT_STORE_SIZE, 
bytesWritten);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_WRITE_EXCEPTION, 1);
+      deleteResponse(requestId);
+      throw e;
+    }
+  }
+
+  /**
+   * Reads the response from the store and populates it with a slice of the 
@link{ResultTable}
+   * @param requestId Request ID of the query
+   * @param offset Offset of the result slice
+   * @param numRows Number of rows required in the slice
+   * @return A CursorResponse with a slice of the @link{ResultTable}
+   * @throws Exception Thrown if there is any error during the operation.
+   */
+  public CursorResponse handleCursorRequest(String requestId, int offset, int 
numRows)
+      throws Exception {
+
+    CursorResponse response;
+    ResultTable resultTable;
+
+    try {
+      response = readResponse(requestId);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+      throw e;
+    }
+
+    int totalTableRows = response.getNumRowsResultSet();
+
+    if (totalTableRows == 0 && offset == 0) {
+      // If sum records is 0, then result set is empty.
+      response.setResultTable(null);
+      response.setOffset(0);
+      response.setNumRows(0);
+      return response;
+    } else if (offset >= totalTableRows) {
+      throw new RuntimeException("Offset " + offset + " is greater than 
totalRecords " + totalTableRows);
+    }
+
+    long fetchStartTime = System.currentTimeMillis();
+    try {
+      resultTable = readResultTable(requestId);
+    } catch (Exception e) {
+      
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+      throw e;
+    }
+
+    int sliceEnd = offset + numRows;
+    if (sliceEnd > totalTableRows) {
+      sliceEnd = totalTableRows;
+      numRows = sliceEnd - offset;
+    }
+
+    response.setResultTable(
+        new ResultTable(resultTable.getDataSchema(), 
resultTable.getRows().subList(offset, sliceEnd)));
+    response.setCursorFetchTimeMs(System.currentTimeMillis() - fetchStartTime);
+    response.setOffset(offset);
+    response.setNumRows(numRows);
+    response.setNumRowsResultSet(totalTableRows);
+    return response;
+  }
+
+  public List<CursorResponse> getAllStoredResponses()
+      throws Exception {
+    List<CursorResponse> responses = new ArrayList<>();
+
+    for (String requestId : getAllStoredRequestIds()) {
+      responses.add(readResponse(requestId));
+    }
+
+    return responses;
+  }
+
+  @Override
+  public boolean deleteResponse(String requestId) throws Exception {
+    if (!exists(requestId)) {
+      return false;
+    }
+
+    long bytesWritten = readResponse(requestId).getBytesWritten();
+    
getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.CURSOR_RESULT_STORE_SIZE, 
bytesWritten * -1);

Review Comment:
   Shouldn't this be done at the end to account for failed deletions?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStoreService.java:
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import javax.annotation.concurrent.ThreadSafe;
+
+
+@ThreadSafe
+public class ResponseStoreService {
+  private static volatile ResponseStoreService _instance = fromServiceLoader();
+
+  private final Set<ResponseStore> _allResponseStores;
+  private final Map<String, ResponseStore> _resultStoreByType;

Review Comment:
   Inconsistent naming can lead to FUD - can we settle on response store or 
result store and use that terminology consistently?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStoreService.java:
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import javax.annotation.concurrent.ThreadSafe;
+
+
+@ThreadSafe
+public class ResponseStoreService {
+  private static volatile ResponseStoreService _instance = fromServiceLoader();
+
+  private final Set<ResponseStore> _allResponseStores;
+  private final Map<String, ResponseStore> _resultStoreByType;
+
+  private ResponseStoreService(Set<ResponseStore> storeSet) {
+    _allResponseStores = storeSet;
+    _resultStoreByType = new HashMap<>();
+
+    for (ResponseStore resultStore : storeSet) {
+      _resultStoreByType.put(resultStore.getType(), resultStore);
+    }
+  }
+
+  public static ResponseStoreService getInstance() {
+    return _instance;
+  }
+
+  public static void setInstance(ResponseStoreService service) {
+    _instance = service;
+  }
+
+  public static ResponseStoreService fromServiceLoader() {
+    Set<ResponseStore> storeSet = new HashSet<>();
+    for (ResponseStore resultStore : ServiceLoader.load(ResponseStore.class)) {
+      storeSet.add(resultStore);
+    }
+
+    return new ResponseStoreService(storeSet);
+  }
+
+  public Set<ResponseStore> getAllResultStores() {
+    return _allResponseStores;
+  }
+
+  public Map<String, ResponseStore> getResultStoresByType() {
+    return _resultStoreByType;
+  }
+
+  public ResponseStore getResultStore(String type) {
+    ResponseStore responseStore = _resultStoreByType.get(type);
+
+    if (responseStore == null) {
+      throw new IllegalArgumentException("Unknown ResultStore type: " + type);

Review Comment:
   Same comment here on `ResultStore` vs `ResponseStore`.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1301,4 +1308,25 @@ public static class NullValuePlaceHolder {
     public static final byte[][] BYTES_ARRAY = new byte[0][];
     public static final Object MAP = Collections.emptyMap();
   }
+
+  public static class CursorConfigs {
+    public static final String DEFAULT_RESPONSE_STORE_TYPE = "file";
+    public static final String DEFAULT_RESPONSE_SERDE = "json";
+    public static final int MAX_CURSOR_FETCH_ROWS = 100000;
+    public static final int DEFAULT_CURSOR_FETCH_ROWS = 10000;
+    public static final String DEFAULT_RESULTS_EXPIRATION_INTERVAL = "1h"; // 
1 hour.
+    public static final String PREFIX_OF_CONFIG_OF_CURSOR = 
"pinot.broker.cursor";
+    public static final String PREFIX_OF_CONFIG_OF_RESPONSE_STORE = 
"pinot.broker.cursor.response.store";
+    public static final String RESPONSE_STORE_TYPE = "type";
+    public static final String RESPONSE_STORE_SERDE = "serde";
+    public static final String CURSOR_FETCH_ROWS = PREFIX_OF_CONFIG_OF_CURSOR 
+ ".fetch.rows";
+    public static final String RESULTS_EXPIRATION_INTERVAL = 
PREFIX_OF_CONFIG_OF_RESPONSE_STORE + ".expiration";
+
+    public static final String RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD =
+        "controller.cluster.response.store.cleaner.frequencyPeriod";
+    public static final String DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD 
= "1h";
+    public static final String RESPONSE_STORE_CLEANER_INITIAL_DELAY =
+        "controller.cluster.response.store.cleaner.initialDelay";

Review Comment:
   Why does this need to be configurable? The default random initial delay 
should be good enough right? Pinot already has a gazillion different 
configurations and it'd be nice to avoid adding unnecessary new ones IMO. It 
leads to a super messy user experience when there are so many irrelevant 
configs to understand and learn about.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to