vrajat commented on code in PR #14110:
URL: https://github.com/apache/pinot/pull/14110#discussion_r1870600576


##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {

Review Comment:
   This is the wrong pattern. There is a single `PinotFs` object in a hash map 
in `PinotFsFactory` which is re-used. `PinotFs.close` is called in 
`PinotFsFactory.shutdown` which is called in `**Starter.stop`. I've remove the 
try-catch block.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {

Review Comment:
   Done



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));

Review Comment:
   Done



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added response store {}", queryDir);

Review Comment:
   No.  I dont have a name for the container of response stores. A response 
store is for a query. Even in the resource, list action (`GET /`) returns a 
list of a response stores. Delete, deletes a response store. You commented 
about inaccuracy between `result store` and `response store`. That was a name 
for the container in some version. I removed that as that also got confusing 
and burdensome. 



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;

Review Comment:
   Done



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;

Review Comment:
   Done. Used `Path.of` which seems to be the preferred variant



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {

Review Comment:
   Unfortunately this is not anomalous. The listing in S3 is like a walkTree 
and returns many paths in a subdir including the root. root wont have a 
metadata file.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added response store {}", queryDir);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error when processing {}", metadata, e);
+        }
+      }
+    }
+
+    return requestIdList;
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    if (pinotFS.exists(queryDir)) {
+      pinotFS.delete(queryDir, true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResponseFile = getTempPath(_localTempDir, "response", requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+
+    try (OutputStream tempResponseFileOS = 
Files.newOutputStream(tempResponseFile)) {
+      _responseSerde.serialize(response, tempResponseFileOS);
+    }
+
+    try {
+      pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);

Review Comment:
   🤷 



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added response store {}", queryDir);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error when processing {}", metadata, e);
+        }
+      }
+    }
+
+    return requestIdList;
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    if (pinotFS.exists(queryDir)) {
+      pinotFS.delete(queryDir, true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResponseFile = getTempPath(_localTempDir, "response", requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+
+    try (OutputStream tempResponseFileOS = 
Files.newOutputStream(tempResponseFile)) {
+      _responseSerde.serialize(response, tempResponseFileOS);
+    }
+
+    try {
+      pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);
+    } finally {
+      Files.delete(tempResponseFile);
+    }
+  }
+
+  @Override
+  protected long writeResultTable(String requestId, ResultTable resultTable)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResultTableFile = getTempPath(_localTempDir, "resultTable", 
requestId);
+    URI dataFile = combinePath(queryDir, 
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+
+    try (OutputStream tempResultTableFileOS = 
Files.newOutputStream(tempResultTableFile)) {
+      _responseSerde.serialize(resultTable, tempResultTableFileOS);
+    }
+
+    try {
+      pinotFS.copyFromLocalFile(tempResultTableFile.toFile(), dataFile);
+      return pinotFS.length(tempResultTableFile.toUri());

Review Comment:
   You are right. I use the `dataFile` now. I checked `S3PinotFs` 
implementation and it reads the metadata. So its not that bad. Network I/O 
still exists.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,

Review Comment:
   Done



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "broker"
+      + File.separator + "result_store" + File.separator;
+  public static final String DEFAULT_SCHEME = "file://";
+  public static final String DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR + "tmp";
+  public static final String DEFAULT_DATA_DIR = DEFAULT_SCHEME + 
DEFAULT_ROOT_DIR + "data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private BrokerMetrics _brokerMetrics;
+  private JsonResponseSerde _responseSerde;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _expirationIntervalInMs;
+  private String _fileExtension;
+
+  private static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = new JsonResponseSerde();
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @NotNull
+  @Override
+  protected BrokerMetrics getBrokerMetrics() {
+    return _brokerMetrics;
+  }
+
+  @Override
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    try (PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme())) {
+      URI queryDir = combinePath(_dataDir, requestId);
+      return pinotFS.exists(queryDir);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());

Review Comment:
   There is a single `PinotFs` object in a hash map in `PinotFsFactory` which 
is re-used. `PinotFs.close` is called in `PinotFsFactory.shutdown` which is 
called in `**Starter.stop`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to