yashmayya commented on code in PR #14110:
URL: https://github.com/apache/pinot/pull/14110#discussion_r1869179578


##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {

Review Comment:
   Can be non-static and use the member variable `_localTempDir` directly 
instead?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;

Review Comment:
   We can use `Paths::get` instead of manual string concatenation with the file 
separator.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,

Review Comment:
   Same comment as above regarding usage of parameterized log messages.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));

Review Comment:
   We should use parameterized log messages instead of `String.format` 
especially for debug logs; avoids unnecessary string concatenation, allocations 
and method calls through lazy evaluation.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());

Review Comment:
   Looks like we are creating and not closing `PinotFs` in all the other 
methods in this class as well?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResponseStoreCleaner periodically gets all responses stored in a response 
store and deletes the ones that have
+ * expired. From each broker, tt gets the list of responses. Each of the 
response has an expiration unix timestamp.
+ * If the current timestamp is greater, it calls a DELETE API for every 
response that has expired.
+ */
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  // Used in tests to trigger the delete instead of waiting for the wall clock 
to move to an appropriate time.
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";
+  private final ControllerConf _controllerConf;
+  private final Executor _executor;
+  private final PoolingHttpClientConnectionManager _connectionManager;
+  private final AuthProvider _authProvider;
+
+  public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics, Executor executor,
+      PoolingHttpClientConnectionManager connectionManager) {
+    super("ResponseStoreCleaner", getFrequencyInSeconds(config), 
getInitialDelayInSeconds(config),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+    _controllerConf = config;
+    _executor = executor;
+    _connectionManager = connectionManager;
+    _authProvider =
+        AuthProviderUtils.extractAuthProvider(config, 
CommonConstants.CursorConfigs.RESPONSE_STORE_AUTH_PREFIX);
+  }
+
+  private static long getInitialDelayInSeconds(ControllerConf config) {
+    long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+    String responseStoreCleanerTaskInitialDelay =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_INITIAL_DELAY);
+    if (responseStoreCleanerTaskInitialDelay != null) {
+      initialDelay = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskInitialDelay),
+          TimeUnit.MILLISECONDS);
+    }
+    return initialDelay;
+  }
+
+  private static long getFrequencyInSeconds(ControllerConf config) {
+    long frequencyInSeconds = TimeUnit.SECONDS.convert(
+        
TimeUtils.convertPeriodToMillis(CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD),
+        TimeUnit.MILLISECONDS);
+    String responseStoreCleanerTaskPeriod =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD);
+    if (responseStoreCleanerTaskPeriod != null) {
+      frequencyInSeconds = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskPeriod),
+          TimeUnit.MILLISECONDS);
+    }
+
+    return frequencyInSeconds;
+  }
+
+  @Override
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
+    long cleanAtMs = System.currentTimeMillis();
+    String cleanAtMsStr = periodicTaskProperties.getProperty(CLEAN_AT_TIME);
+    if (cleanAtMsStr != null) {
+      cleanAtMs = Long.parseLong(cleanAtMsStr);
+    }
+    doClean(cleanAtMs);
+  }
+
+  public void doClean(long currentTime) {
+    List<InstanceConfig> brokerList = 
_pinotHelixResourceManager.getAllBrokerInstanceConfigs();
+    Map<String, InstanceInfo> brokers = new HashMap<>();
+    for (InstanceConfig broker : brokerList) {
+      brokers.put(getInstanceKey(broker.getHostName(), broker.getPort()),
+          new InstanceInfo(broker.getInstanceName(), broker.getHostName(), 
Integer.parseInt(broker.getPort())));
+    }
+
+    try {
+      Map<String, String> requestHeaders = 
AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+
+      Map<String, List<CursorResponseNative>> brokerCursorsMap = 
getAllQueryResults(brokers, requestHeaders);
+
+      String protocol = _controllerConf.getControllerBrokerProtocol();
+      int portOverride = _controllerConf.getControllerBrokerPortOverride();
+
+      List<String> brokerUrls = new ArrayList<>();
+      for (Map.Entry<String, List<CursorResponseNative>> entry : 
brokerCursorsMap.entrySet()) {
+        for (CursorResponse response : entry.getValue()) {
+          if (response.getExpirationTimeMs() <= currentTime) {
+            InstanceInfo broker = brokers.get(entry.getKey());
+            int port = portOverride > 0 ? portOverride : broker.getPort();
+            brokerUrls.add(
+                String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(), 
port, response.getRequestId()));
+          }
+        }
+        Map<String, String> deleteStatus = getResponseMap(requestHeaders, 
brokerUrls, "DELETE", HttpDelete::new);
+
+        deleteStatus.forEach((key, value) -> LOGGER.info(
+            String.format("ResponseStore delete response - Broker: %s. 
Response: %s", key, value)));
+      }
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage());
+    }
+  }
+
+  private Map<String, List<CursorResponseNative>> 
getAllQueryResults(Map<String, InstanceInfo> brokers,
+      Map<String, String> requestHeaders)
+      throws Exception {
+    String protocol = _controllerConf.getControllerBrokerProtocol();
+    int portOverride = _controllerConf.getControllerBrokerPortOverride();
+    List<String> brokerUrls = new ArrayList<>();
+    for (InstanceInfo broker : brokers.values()) {
+      int port = portOverride > 0 ? portOverride : broker.getPort();
+      brokerUrls.add(String.format(QUERY_RESULT_STORE, protocol, 
broker.getHost(), port));
+    }
+    LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls);
+    Map<String, String> strResponseMap = getResponseMap(requestHeaders, 
brokerUrls, "GET", HttpGet::new);
+    return 
strResponseMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, 
e -> {
+      try {
+        return JsonUtils.stringToObject(e.getValue(), new TypeReference<>() {
+        });
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }));
+  }
+
+  private <T extends HttpUriRequestBase> Map<String, String> 
getResponseMap(Map<String, String> requestHeaders,
+      List<String> brokerUrls, String methodName, Function<String, T> 
httpRequestBaseSupplier)
+      throws Exception {
+    List<Pair<String, String>> urlsAndRequestBodies = new 
ArrayList<>(brokerUrls.size());
+    brokerUrls.forEach((url) -> urlsAndRequestBodies.add(Pair.of(url, "")));
+
+    CompletionService<MultiHttpRequestResponse> completionService =
+        new MultiHttpRequest(_executor, 
_connectionManager).execute(urlsAndRequestBodies, requestHeaders,
+            ResponseStoreCleaner.TIMEOUT_MS, methodName, 
httpRequestBaseSupplier);
+    Map<String, String> responseMap = new HashMap<>();
+    List<String> errMessages = new ArrayList<>(brokerUrls.size());
+    for (int i = 0; i < brokerUrls.size(); i++) {
+      try (MultiHttpRequestResponse httpRequestResponse = 
completionService.take().get()) {
+        // The completion order is different from brokerUrls, thus use uri in 
the response.
+        URI uri = httpRequestResponse.getURI();
+        int status = httpRequestResponse.getResponse().getCode();
+        String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+        // Unexpected server responses are collected and returned as exception.
+        if (status != 200) {
+          throw new Exception(
+              String.format("Unexpected status=%d and response='%s' from 
uri='%s'", status, responseString, uri));
+        }
+        responseMap.put((getInstanceKey(uri.getHost(), 
Integer.toString(uri.getPort()))), responseString);
+      } catch (Exception e) {
+        LOGGER.error("Failed to get queries", e);

Review Comment:
   This method is also being used to delete responses right?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResponseStoreCleaner periodically gets all responses stored in a response 
store and deletes the ones that have
+ * expired. From each broker, tt gets the list of responses. Each of the 
response has an expiration unix timestamp.
+ * If the current timestamp is greater, it calls a DELETE API for every 
response that has expired.
+ */
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  // Used in tests to trigger the delete instead of waiting for the wall clock 
to move to an appropriate time.
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";

Review Comment:
   Can't we just set the frequency config to a much smaller number (few seconds 
maybe) instead of introducing this hidden configuration?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {

Review Comment:
   Why do we need to create and close the PinotFS on each method call?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;

Review Comment:
   > result_store
   
   I think we've been using `camelCase` or `UpperCamelCase` for file names 
everywhere else?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());

Review Comment:
   Same question here regarding lifetime of PinotFS objects. Also we aren't 
closing it here unlike in `exists`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResponseStoreCleaner periodically gets all responses stored in a response 
store and deletes the ones that have
+ * expired. From each broker, tt gets the list of responses. Each of the 
response has an expiration unix timestamp.
+ * If the current timestamp is greater, it calls a DELETE API for every 
response that has expired.
+ */
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  // Used in tests to trigger the delete instead of waiting for the wall clock 
to move to an appropriate time.
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";
+  private final ControllerConf _controllerConf;
+  private final Executor _executor;
+  private final PoolingHttpClientConnectionManager _connectionManager;
+  private final AuthProvider _authProvider;
+
+  public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics, Executor executor,
+      PoolingHttpClientConnectionManager connectionManager) {
+    super("ResponseStoreCleaner", getFrequencyInSeconds(config), 
getInitialDelayInSeconds(config),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+    _controllerConf = config;
+    _executor = executor;
+    _connectionManager = connectionManager;
+    _authProvider =
+        AuthProviderUtils.extractAuthProvider(config, 
CommonConstants.CursorConfigs.RESPONSE_STORE_AUTH_PREFIX);
+  }
+
+  private static long getInitialDelayInSeconds(ControllerConf config) {
+    long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+    String responseStoreCleanerTaskInitialDelay =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_INITIAL_DELAY);
+    if (responseStoreCleanerTaskInitialDelay != null) {
+      initialDelay = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskInitialDelay),
+          TimeUnit.MILLISECONDS);
+    }
+    return initialDelay;
+  }
+
+  private static long getFrequencyInSeconds(ControllerConf config) {
+    long frequencyInSeconds = TimeUnit.SECONDS.convert(
+        
TimeUtils.convertPeriodToMillis(CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD),
+        TimeUnit.MILLISECONDS);
+    String responseStoreCleanerTaskPeriod =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD);
+    if (responseStoreCleanerTaskPeriod != null) {
+      frequencyInSeconds = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskPeriod),
+          TimeUnit.MILLISECONDS);
+    }
+
+    return frequencyInSeconds;
+  }
+
+  @Override
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
+    long cleanAtMs = System.currentTimeMillis();
+    String cleanAtMsStr = periodicTaskProperties.getProperty(CLEAN_AT_TIME);
+    if (cleanAtMsStr != null) {
+      cleanAtMs = Long.parseLong(cleanAtMsStr);
+    }

Review Comment:
   Is this only for tests? It's a little strange to read and I think it'd be 
nicer to find an alternate mechanism (like configuring this cleaner to run more 
frequently) instead.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added response store {}", queryDir);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error when processing {}", metadata, e);
+        }
+      }
+    }
+
+    return requestIdList;
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    if (pinotFS.exists(queryDir)) {
+      pinotFS.delete(queryDir, true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResponseFile = getTempPath(_localTempDir, "response", requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+
+    try (OutputStream tempResponseFileOS = 
Files.newOutputStream(tempResponseFile)) {
+      _responseSerde.serialize(response, tempResponseFileOS);
+    }
+
+    try {
+      pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);

Review Comment:
   The `PinotFs` interface doesn't allow us to directly write to an output 
stream? I don't see anything like that either but found it to be pretty strange 
😕 



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added response store {}", queryDir);

Review Comment:
   Isn't this entire class a "response store"? I'm confused about the 
terminology - for the user, we're calling every query's stored result a 
response store?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added response store {}", queryDir);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error when processing {}", metadata, e);
+        }
+      }
+    }
+
+    return requestIdList;
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    if (pinotFS.exists(queryDir)) {
+      pinotFS.delete(queryDir, true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResponseFile = getTempPath(_localTempDir, "response", requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+
+    try (OutputStream tempResponseFileOS = 
Files.newOutputStream(tempResponseFile)) {
+      _responseSerde.serialize(response, tempResponseFileOS);
+    }
+
+    try {
+      pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);
+    } finally {
+      Files.delete(tempResponseFile);
+    }
+  }
+
+  @Override
+  protected long writeResultTable(String requestId, ResultTable resultTable)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResultTableFile = getTempPath(_localTempDir, "resultTable", 
requestId);
+    URI dataFile = combinePath(queryDir, 
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+
+    try (OutputStream tempResultTableFileOS = 
Files.newOutputStream(tempResultTableFile)) {
+      _responseSerde.serialize(resultTable, tempResultTableFileOS);
+    }
+
+    try {
+      pinotFS.copyFromLocalFile(tempResultTableFile.toFile(), dataFile);
+      return pinotFS.length(tempResultTableFile.toUri());

Review Comment:
   Might be cheaper to read the number of bytes from the temp file on disk? 
This will result in additional remote reads for blob store based file systems 
and the like right?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResponseStoreCleaner periodically gets all responses stored in a response 
store and deletes the ones that have
+ * expired. From each broker, tt gets the list of responses. Each of the 
response has an expiration unix timestamp.
+ * If the current timestamp is greater, it calls a DELETE API for every 
response that has expired.
+ */
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  // Used in tests to trigger the delete instead of waiting for the wall clock 
to move to an appropriate time.
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";
+  private final ControllerConf _controllerConf;
+  private final Executor _executor;
+  private final PoolingHttpClientConnectionManager _connectionManager;
+  private final AuthProvider _authProvider;
+
+  public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics, Executor executor,
+      PoolingHttpClientConnectionManager connectionManager) {
+    super("ResponseStoreCleaner", getFrequencyInSeconds(config), 
getInitialDelayInSeconds(config),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+    _controllerConf = config;
+    _executor = executor;
+    _connectionManager = connectionManager;
+    _authProvider =
+        AuthProviderUtils.extractAuthProvider(config, 
CommonConstants.CursorConfigs.RESPONSE_STORE_AUTH_PREFIX);
+  }
+
+  private static long getInitialDelayInSeconds(ControllerConf config) {
+    long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+    String responseStoreCleanerTaskInitialDelay =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_INITIAL_DELAY);
+    if (responseStoreCleanerTaskInitialDelay != null) {
+      initialDelay = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskInitialDelay),
+          TimeUnit.MILLISECONDS);
+    }
+    return initialDelay;
+  }
+
+  private static long getFrequencyInSeconds(ControllerConf config) {
+    long frequencyInSeconds = TimeUnit.SECONDS.convert(
+        
TimeUtils.convertPeriodToMillis(CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD),
+        TimeUnit.MILLISECONDS);
+    String responseStoreCleanerTaskPeriod =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD);
+    if (responseStoreCleanerTaskPeriod != null) {
+      frequencyInSeconds = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskPeriod),
+          TimeUnit.MILLISECONDS);
+    }
+
+    return frequencyInSeconds;
+  }
+
+  @Override
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
+    long cleanAtMs = System.currentTimeMillis();
+    String cleanAtMsStr = periodicTaskProperties.getProperty(CLEAN_AT_TIME);
+    if (cleanAtMsStr != null) {
+      cleanAtMs = Long.parseLong(cleanAtMsStr);
+    }
+    doClean(cleanAtMs);
+  }
+
+  public void doClean(long currentTime) {
+    List<InstanceConfig> brokerList = 
_pinotHelixResourceManager.getAllBrokerInstanceConfigs();
+    Map<String, InstanceInfo> brokers = new HashMap<>();
+    for (InstanceConfig broker : brokerList) {
+      brokers.put(getInstanceKey(broker.getHostName(), broker.getPort()),
+          new InstanceInfo(broker.getInstanceName(), broker.getHostName(), 
Integer.parseInt(broker.getPort())));
+    }
+
+    try {
+      Map<String, String> requestHeaders = 
AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+
+      Map<String, List<CursorResponseNative>> brokerCursorsMap = 
getAllQueryResults(brokers, requestHeaders);
+
+      String protocol = _controllerConf.getControllerBrokerProtocol();
+      int portOverride = _controllerConf.getControllerBrokerPortOverride();
+
+      List<String> brokerUrls = new ArrayList<>();
+      for (Map.Entry<String, List<CursorResponseNative>> entry : 
brokerCursorsMap.entrySet()) {
+        for (CursorResponse response : entry.getValue()) {
+          if (response.getExpirationTimeMs() <= currentTime) {
+            InstanceInfo broker = brokers.get(entry.getKey());
+            int port = portOverride > 0 ? portOverride : broker.getPort();
+            brokerUrls.add(
+                String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(), 
port, response.getRequestId()));
+          }
+        }
+        Map<String, String> deleteStatus = getResponseMap(requestHeaders, 
brokerUrls, "DELETE", HttpDelete::new);
+
+        deleteStatus.forEach((key, value) -> LOGGER.info(
+            String.format("ResponseStore delete response - Broker: %s. 
Response: %s", key, value)));

Review Comment:
   nit: let's use parameterized logging instead.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {

Review Comment:
   Let's log a warning if it doesn't exist? Seems like an anomalous condition 
right?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResponseStoreCleaner periodically gets all responses stored in a response 
store and deletes the ones that have
+ * expired. From each broker, tt gets the list of responses. Each of the 
response has an expiration unix timestamp.
+ * If the current timestamp is greater, it calls a DELETE API for every 
response that has expired.
+ */
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  // Used in tests to trigger the delete instead of waiting for the wall clock 
to move to an appropriate time.
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";
+  private final ControllerConf _controllerConf;
+  private final Executor _executor;
+  private final PoolingHttpClientConnectionManager _connectionManager;
+  private final AuthProvider _authProvider;
+
+  public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics, Executor executor,
+      PoolingHttpClientConnectionManager connectionManager) {
+    super("ResponseStoreCleaner", getFrequencyInSeconds(config), 
getInitialDelayInSeconds(config),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+    _controllerConf = config;
+    _executor = executor;
+    _connectionManager = connectionManager;
+    _authProvider =
+        AuthProviderUtils.extractAuthProvider(config, 
CommonConstants.CursorConfigs.RESPONSE_STORE_AUTH_PREFIX);
+  }
+
+  private static long getInitialDelayInSeconds(ControllerConf config) {
+    long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+    String responseStoreCleanerTaskInitialDelay =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_INITIAL_DELAY);
+    if (responseStoreCleanerTaskInitialDelay != null) {
+      initialDelay = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskInitialDelay),
+          TimeUnit.MILLISECONDS);
+    }
+    return initialDelay;
+  }
+
+  private static long getFrequencyInSeconds(ControllerConf config) {
+    long frequencyInSeconds = TimeUnit.SECONDS.convert(
+        
TimeUtils.convertPeriodToMillis(CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD),
+        TimeUnit.MILLISECONDS);
+    String responseStoreCleanerTaskPeriod =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD);
+    if (responseStoreCleanerTaskPeriod != null) {
+      frequencyInSeconds = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskPeriod),
+          TimeUnit.MILLISECONDS);
+    }
+
+    return frequencyInSeconds;
+  }
+
+  @Override
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
+    long cleanAtMs = System.currentTimeMillis();
+    String cleanAtMsStr = periodicTaskProperties.getProperty(CLEAN_AT_TIME);
+    if (cleanAtMsStr != null) {
+      cleanAtMs = Long.parseLong(cleanAtMsStr);
+    }
+    doClean(cleanAtMs);
+  }
+
+  public void doClean(long currentTime) {
+    List<InstanceConfig> brokerList = 
_pinotHelixResourceManager.getAllBrokerInstanceConfigs();
+    Map<String, InstanceInfo> brokers = new HashMap<>();
+    for (InstanceConfig broker : brokerList) {
+      brokers.put(getInstanceKey(broker.getHostName(), broker.getPort()),
+          new InstanceInfo(broker.getInstanceName(), broker.getHostName(), 
Integer.parseInt(broker.getPort())));
+    }
+
+    try {
+      Map<String, String> requestHeaders = 
AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+
+      Map<String, List<CursorResponseNative>> brokerCursorsMap = 
getAllQueryResults(brokers, requestHeaders);
+
+      String protocol = _controllerConf.getControllerBrokerProtocol();
+      int portOverride = _controllerConf.getControllerBrokerPortOverride();
+
+      List<String> brokerUrls = new ArrayList<>();
+      for (Map.Entry<String, List<CursorResponseNative>> entry : 
brokerCursorsMap.entrySet()) {
+        for (CursorResponse response : entry.getValue()) {
+          if (response.getExpirationTimeMs() <= currentTime) {
+            InstanceInfo broker = brokers.get(entry.getKey());
+            int port = portOverride > 0 ? portOverride : broker.getPort();
+            brokerUrls.add(
+                String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(), 
port, response.getRequestId()));
+          }
+        }
+        Map<String, String> deleteStatus = getResponseMap(requestHeaders, 
brokerUrls, "DELETE", HttpDelete::new);

Review Comment:
   Isn't this going to result in duplicate deletes for response stores that are 
not broker specific (blob stores etc.)? I see that you've documented this issue 
in the `ResponseStore` interface warning about multiple deletes and even 
potentially concurrent deletes. I don't get why that's required though? Can't 
we deduplicate based on the request ID from the cursor response and only issue 
one delete request per query (to any one broker) here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResponseStoreCleaner periodically gets all responses stored in a response 
store and deletes the ones that have
+ * expired. From each broker, tt gets the list of responses. Each of the 
response has an expiration unix timestamp.
+ * If the current timestamp is greater, it calls a DELETE API for every 
response that has expired.
+ */
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  // Used in tests to trigger the delete instead of waiting for the wall clock 
to move to an appropriate time.
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";
+  private final ControllerConf _controllerConf;
+  private final Executor _executor;
+  private final PoolingHttpClientConnectionManager _connectionManager;
+  private final AuthProvider _authProvider;
+
+  public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics, Executor executor,
+      PoolingHttpClientConnectionManager connectionManager) {
+    super("ResponseStoreCleaner", getFrequencyInSeconds(config), 
getInitialDelayInSeconds(config),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+    _controllerConf = config;
+    _executor = executor;
+    _connectionManager = connectionManager;
+    _authProvider =
+        AuthProviderUtils.extractAuthProvider(config, 
CommonConstants.CursorConfigs.RESPONSE_STORE_AUTH_PREFIX);
+  }
+
+  private static long getInitialDelayInSeconds(ControllerConf config) {
+    long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+    String responseStoreCleanerTaskInitialDelay =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_INITIAL_DELAY);
+    if (responseStoreCleanerTaskInitialDelay != null) {
+      initialDelay = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskInitialDelay),
+          TimeUnit.MILLISECONDS);
+    }
+    return initialDelay;
+  }
+
+  private static long getFrequencyInSeconds(ControllerConf config) {
+    long frequencyInSeconds = TimeUnit.SECONDS.convert(
+        
TimeUtils.convertPeriodToMillis(CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD),
+        TimeUnit.MILLISECONDS);

Review Comment:
   We could do this in an else block below instead to avoid unnecessary 
conversion when the `controller.cluster.response.store.cleaner.frequencyPeriod` 
config is set.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResponseStoreCleaner periodically gets all responses stored in a response 
store and deletes the ones that have
+ * expired. From each broker, tt gets the list of responses. Each of the 
response has an expiration unix timestamp.
+ * If the current timestamp is greater, it calls a DELETE API for every 
response that has expired.
+ */
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  // Used in tests to trigger the delete instead of waiting for the wall clock 
to move to an appropriate time.
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";
+  private final ControllerConf _controllerConf;
+  private final Executor _executor;
+  private final PoolingHttpClientConnectionManager _connectionManager;
+  private final AuthProvider _authProvider;
+
+  public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics, Executor executor,
+      PoolingHttpClientConnectionManager connectionManager) {
+    super("ResponseStoreCleaner", getFrequencyInSeconds(config), 
getInitialDelayInSeconds(config),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+    _controllerConf = config;
+    _executor = executor;
+    _connectionManager = connectionManager;
+    _authProvider =
+        AuthProviderUtils.extractAuthProvider(config, 
CommonConstants.CursorConfigs.RESPONSE_STORE_AUTH_PREFIX);
+  }
+
+  private static long getInitialDelayInSeconds(ControllerConf config) {
+    long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+    String responseStoreCleanerTaskInitialDelay =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_INITIAL_DELAY);
+    if (responseStoreCleanerTaskInitialDelay != null) {
+      initialDelay = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskInitialDelay),
+          TimeUnit.MILLISECONDS);
+    }
+    return initialDelay;
+  }
+
+  private static long getFrequencyInSeconds(ControllerConf config) {
+    long frequencyInSeconds = TimeUnit.SECONDS.convert(
+        
TimeUtils.convertPeriodToMillis(CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD),
+        TimeUnit.MILLISECONDS);
+    String responseStoreCleanerTaskPeriod =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD);
+    if (responseStoreCleanerTaskPeriod != null) {
+      frequencyInSeconds = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskPeriod),
+          TimeUnit.MILLISECONDS);
+    }
+
+    return frequencyInSeconds;
+  }
+
+  @Override
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
+    long cleanAtMs = System.currentTimeMillis();
+    String cleanAtMsStr = periodicTaskProperties.getProperty(CLEAN_AT_TIME);
+    if (cleanAtMsStr != null) {
+      cleanAtMs = Long.parseLong(cleanAtMsStr);
+    }
+    doClean(cleanAtMs);
+  }
+
+  public void doClean(long currentTime) {
+    List<InstanceConfig> brokerList = 
_pinotHelixResourceManager.getAllBrokerInstanceConfigs();
+    Map<String, InstanceInfo> brokers = new HashMap<>();
+    for (InstanceConfig broker : brokerList) {
+      brokers.put(getInstanceKey(broker.getHostName(), broker.getPort()),
+          new InstanceInfo(broker.getInstanceName(), broker.getHostName(), 
Integer.parseInt(broker.getPort())));
+    }
+
+    try {
+      Map<String, String> requestHeaders = 
AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+
+      Map<String, List<CursorResponseNative>> brokerCursorsMap = 
getAllQueryResults(brokers, requestHeaders);
+
+      String protocol = _controllerConf.getControllerBrokerProtocol();
+      int portOverride = _controllerConf.getControllerBrokerPortOverride();
+
+      List<String> brokerUrls = new ArrayList<>();
+      for (Map.Entry<String, List<CursorResponseNative>> entry : 
brokerCursorsMap.entrySet()) {
+        for (CursorResponse response : entry.getValue()) {
+          if (response.getExpirationTimeMs() <= currentTime) {
+            InstanceInfo broker = brokers.get(entry.getKey());
+            int port = portOverride > 0 ? portOverride : broker.getPort();
+            brokerUrls.add(
+                String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(), 
port, response.getRequestId()));
+          }
+        }
+        Map<String, String> deleteStatus = getResponseMap(requestHeaders, 
brokerUrls, "DELETE", HttpDelete::new);
+
+        deleteStatus.forEach((key, value) -> LOGGER.info(
+            String.format("ResponseStore delete response - Broker: %s. 
Response: %s", key, value)));
+      }
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage());
+    }
+  }
+
+  private Map<String, List<CursorResponseNative>> 
getAllQueryResults(Map<String, InstanceInfo> brokers,
+      Map<String, String> requestHeaders)
+      throws Exception {
+    String protocol = _controllerConf.getControllerBrokerProtocol();
+    int portOverride = _controllerConf.getControllerBrokerPortOverride();
+    List<String> brokerUrls = new ArrayList<>();
+    for (InstanceInfo broker : brokers.values()) {
+      int port = portOverride > 0 ? portOverride : broker.getPort();
+      brokerUrls.add(String.format(QUERY_RESULT_STORE, protocol, 
broker.getHost(), port));
+    }
+    LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls);
+    Map<String, String> strResponseMap = getResponseMap(requestHeaders, 
brokerUrls, "GET", HttpGet::new);
+    return 
strResponseMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, 
e -> {
+      try {
+        return JsonUtils.stringToObject(e.getValue(), new TypeReference<>() {
+        });
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }));
+  }
+
+  private <T extends HttpUriRequestBase> Map<String, String> 
getResponseMap(Map<String, String> requestHeaders,
+      List<String> brokerUrls, String methodName, Function<String, T> 
httpRequestBaseSupplier)
+      throws Exception {
+    List<Pair<String, String>> urlsAndRequestBodies = new 
ArrayList<>(brokerUrls.size());
+    brokerUrls.forEach((url) -> urlsAndRequestBodies.add(Pair.of(url, "")));
+
+    CompletionService<MultiHttpRequestResponse> completionService =
+        new MultiHttpRequest(_executor, 
_connectionManager).execute(urlsAndRequestBodies, requestHeaders,
+            ResponseStoreCleaner.TIMEOUT_MS, methodName, 
httpRequestBaseSupplier);
+    Map<String, String> responseMap = new HashMap<>();
+    List<String> errMessages = new ArrayList<>(brokerUrls.size());
+    for (int i = 0; i < brokerUrls.size(); i++) {
+      try (MultiHttpRequestResponse httpRequestResponse = 
completionService.take().get()) {
+        // The completion order is different from brokerUrls, thus use uri in 
the response.
+        URI uri = httpRequestResponse.getURI();
+        int status = httpRequestResponse.getResponse().getCode();
+        String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+        // Unexpected server responses are collected and returned as exception.
+        if (status != 200) {
+          throw new Exception(
+              String.format("Unexpected status=%d and response='%s' from 
uri='%s'", status, responseString, uri));
+        }
+        responseMap.put((getInstanceKey(uri.getHost(), 
Integer.toString(uri.getPort()))), responseString);
+      } catch (Exception e) {
+        LOGGER.error("Failed to get queries", e);
+        // Can't just throw exception from here as there is a need to release 
the other connections.
+        // So just collect the error msg to throw them together after the 
for-loop.
+        errMessages.add(e.getMessage());

Review Comment:
   We'll lose the stack trace context for other exceptions (apart from the one 
we're throwing ourselves above) that are caught and handled here.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1301,4 +1308,25 @@ public static class NullValuePlaceHolder {
     public static final byte[][] BYTES_ARRAY = new byte[0][];
     public static final Object MAP = Collections.emptyMap();
   }
+
+  public static class CursorConfigs {
+    public static final String DEFAULT_RESPONSE_STORE_TYPE = "file";
+    public static final String DEFAULT_RESPONSE_SERDE = "json";
+    public static final int MAX_CURSOR_FETCH_ROWS = 100000;
+    public static final int DEFAULT_CURSOR_FETCH_ROWS = 10000;
+    public static final String DEFAULT_RESULTS_EXPIRATION_INTERVAL = "1h"; // 
1 hour.
+    public static final String PREFIX_OF_CONFIG_OF_CURSOR = 
"pinot.broker.cursor";
+    public static final String PREFIX_OF_CONFIG_OF_RESPONSE_STORE = 
"pinot.broker.cursor.response.store";
+    public static final String RESPONSE_STORE_TYPE = "type";
+    public static final String RESPONSE_STORE_SERDE = "serde";
+    public static final String CURSOR_FETCH_ROWS = PREFIX_OF_CONFIG_OF_CURSOR 
+ ".fetch.rows";
+    public static final String RESULTS_EXPIRATION_INTERVAL = 
PREFIX_OF_CONFIG_OF_RESPONSE_STORE + ".expiration";
+
+    public static final String RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD =
+        "controller.cluster.response.store.cleaner.frequencyPeriod";
+    public static final String DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD 
= "1h";
+    public static final String RESPONSE_STORE_CLEANER_INITIAL_DELAY =
+        "controller.cluster.response.store.cleaner.initialDelay";

Review Comment:
   Not sure why that matters for this task though? Maybe it's more relevant for 
the other tasks, haven't taken a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to