gortiz commented on code in PR #14110:
URL: https://github.com/apache/pinot/pull/14110#discussion_r1808688033


##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {

Review Comment:
   It would be great to document in javadoc the structure created by this 
response store.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_TEMP_DIR = 
"/tmp/pinot/broker/result_store/tmp";
+  public static final String DEFAULT_DATA_DIR = 
"/tmp/pinot/broker/result_store/data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  Path _localTempDir;
+  URI _dataDir;
+  ResponseSerde _responseSerde;
+  String _fileExtension;
+
+  public static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  public static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(PinotConfiguration config, BrokerMetrics brokerMetrics, 
ResponseSerde responseSerde)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = responseSerde;
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    return pinotFS.exists(queryDir);
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug(String.format("Found %d paths.", queryPaths.size()));
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added query store {}", queryDir);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error when processing {}", metadata, e);
+        }
+      }
+    }
+
+    return requestIdList;
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    if (pinotFS.exists(queryDir)) {
+      pinotFS.delete(queryDir, true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResponseFile = getTempPath(_localTempDir, "response", requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+    try {
+      _responseSerde.serialize(response, 
Files.newOutputStream(tempResponseFile));
+      pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);
+    } finally {
+      Files.delete(tempResponseFile);
+    }
+  }
+
+  @Override
+  protected long writeResultTable(String requestId, ResultTable resultTable)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResultTableFile = getTempPath(_localTempDir, "resultTable", 
requestId);
+    URI dataFile = combinePath(queryDir, 
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+    try {
+      _responseSerde.serialize(resultTable, 
Files.newOutputStream(tempResultTableFile));
+      pinotFS.copyFromLocalFile(tempResultTableFile.toFile(), dataFile);
+      return pinotFS.length(tempResultTableFile.toUri());
+    } finally {
+      Files.delete(tempResultTableFile);
+    }
+  }
+
+  @Override
+  public CursorResponse readResponse(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+    return _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+  }
+
+  @Override
+  protected ResultTable readResultTable(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    URI dataFile = combinePath(queryDir, 
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+    return _responseSerde.deserialize(pinotFS.open(dataFile), 
ResultTable.class);

Review Comment:
   Same here, the InputStream should be closed



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_TEMP_DIR = 
"/tmp/pinot/broker/result_store/tmp";
+  public static final String DEFAULT_DATA_DIR = 
"/tmp/pinot/broker/result_store/data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  Path _localTempDir;
+  URI _dataDir;
+  ResponseSerde _responseSerde;
+  String _fileExtension;
+
+  public static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  public static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(PinotConfiguration config, BrokerMetrics brokerMetrics, 
ResponseSerde responseSerde)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = responseSerde;
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    return pinotFS.exists(queryDir);
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug(String.format("Found %d paths.", queryPaths.size()));
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added query store {}", queryDir);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error when processing {}", metadata, e);
+        }
+      }
+    }
+
+    return requestIdList;
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    if (pinotFS.exists(queryDir)) {
+      pinotFS.delete(queryDir, true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResponseFile = getTempPath(_localTempDir, "response", requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+    try {
+      _responseSerde.serialize(response, 
Files.newOutputStream(tempResponseFile));
+      pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);
+    } finally {
+      Files.delete(tempResponseFile);
+    }
+  }
+
+  @Override
+  protected long writeResultTable(String requestId, ResultTable resultTable)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResultTableFile = getTempPath(_localTempDir, "resultTable", 
requestId);
+    URI dataFile = combinePath(queryDir, 
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+    try {
+      _responseSerde.serialize(resultTable, 
Files.newOutputStream(tempResultTableFile));
+      pinotFS.copyFromLocalFile(tempResultTableFile.toFile(), dataFile);
+      return pinotFS.length(tempResultTableFile.toUri());
+    } finally {
+      Files.delete(tempResultTableFile);
+    }
+  }
+
+  @Override
+  public CursorResponse readResponse(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+    return _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);

Review Comment:
   The InputStream opened here is not closed.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_TEMP_DIR = 
"/tmp/pinot/broker/result_store/tmp";
+  public static final String DEFAULT_DATA_DIR = 
"/tmp/pinot/broker/result_store/data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  Path _localTempDir;
+  URI _dataDir;
+  ResponseSerde _responseSerde;
+  String _fileExtension;
+
+  public static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  public static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(PinotConfiguration config, BrokerMetrics brokerMetrics, 
ResponseSerde responseSerde)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = responseSerde;
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    return pinotFS.exists(queryDir);
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug(String.format("Found %d paths.", queryPaths.size()));
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added query store {}", queryDir);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error when processing {}", metadata, e);
+        }
+      }
+    }
+
+    return requestIdList;
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    if (pinotFS.exists(queryDir)) {
+      pinotFS.delete(queryDir, true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResponseFile = getTempPath(_localTempDir, "response", requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+    try {
+      _responseSerde.serialize(response, 
Files.newOutputStream(tempResponseFile));
+      pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);
+    } finally {
+      Files.delete(tempResponseFile);
+    }
+  }
+
+  @Override
+  protected long writeResultTable(String requestId, ResultTable resultTable)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResultTableFile = getTempPath(_localTempDir, "resultTable", 
requestId);
+    URI dataFile = combinePath(queryDir, 
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+    try {
+      _responseSerde.serialize(resultTable, 
Files.newOutputStream(tempResultTableFile));

Review Comment:
   This output stream should also be closed



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1261,4 +1268,23 @@ public static class NullValuePlaceHolder {
     public static final byte[][] BYTES_ARRAY = new byte[0][];
     public static final Object MAP = Collections.emptyMap();
   }
+
+  public static class CursorConfigs {
+    public static final String DEFAULT_RESULT_STORE_TYPE = "file";
+    public static final String DEFAULT_RESULT_SERDE = "json";
+    public static final int MAX_QUERY_RESULT_SIZE = 100000;
+    public static final int DEFAULT_QUERY_RESULT_SIZE = 10000;
+    public static final String DEFAULT_RESULTS_EXPIRATION_INTERVAL = "1h"; // 
1 hour.
+    public static final String PREFIX_OF_CONFIG_OF_CURSOR = 
"pinot.broker.cursor";
+    public static final String PREFIX_OF_CONFIG_OF_RESULT_STORE = 
"pinot.broker.cursor.response.store";
+    public static final String RESULT_STORE_TYPE = "type";
+    public static final String RESULT_STORE_SERDE = "serde";
+    public static final String QUERY_RESULT_SIZE = PREFIX_OF_CONFIG_OF_CURSOR 
+ ".result.size";
+    public static final String RESULTS_EXPIRATION_INTERVAL = 
PREFIX_OF_CONFIG_OF_RESULT_STORE + ".expiration";
+
+    public static final String RESULT_STORE_CLEANER_FREQUENCY_PERIOD =
+        "controller.cluster.response.store.cleaner.frequencyPeriod";
+    public static final String RESULT_STORE_CLEANER_INITIAL_DELAY =
+        "controller.cluster.response.store.cleaner.initialDelay";

Review Comment:
   Usually defaults are defined close to the property definition. Where are 
they defined in this case?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {

Review Comment:
   Can we have a javadoc here please?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1261,4 +1268,23 @@ public static class NullValuePlaceHolder {
     public static final byte[][] BYTES_ARRAY = new byte[0][];
     public static final Object MAP = Collections.emptyMap();
   }
+
+  public static class CursorConfigs {
+    public static final String DEFAULT_RESULT_STORE_TYPE = "file";

Review Comment:
   I'm not sure if we should use this as a default. IICU this store type only 
works when queries go to the broker that opened it, which is difficult to 
control for users in the suggested deployments using k8s (unless there is only 
a single broker).
   
   I understand this result store is useful during development, but honestly I 
would prefer to disable this feature by default and force users to provide an 
actual production ready result store



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1261,4 +1268,23 @@ public static class NullValuePlaceHolder {
     public static final byte[][] BYTES_ARRAY = new byte[0][];
     public static final Object MAP = Collections.emptyMap();
   }
+
+  public static class CursorConfigs {
+    public static final String DEFAULT_RESULT_STORE_TYPE = "file";
+    public static final String DEFAULT_RESULT_SERDE = "json";
+    public static final int MAX_QUERY_RESULT_SIZE = 100000;
+    public static final int DEFAULT_QUERY_RESULT_SIZE = 10000;
+    public static final String DEFAULT_RESULTS_EXPIRATION_INTERVAL = "1h"; // 
1 hour.
+    public static final String PREFIX_OF_CONFIG_OF_CURSOR = 
"pinot.broker.cursor";
+    public static final String PREFIX_OF_CONFIG_OF_RESULT_STORE = 
"pinot.broker.cursor.response.store";
+    public static final String RESULT_STORE_TYPE = "type";
+    public static final String RESULT_STORE_SERDE = "serde";
+    public static final String QUERY_RESULT_SIZE = PREFIX_OF_CONFIG_OF_CURSOR 
+ ".result.size";

Review Comment:
   Can we indicate as a comment (or changing the name of the constant/property) 
whether this is bytes or rows?



##########
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java:
##########
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  protected BrokerMetrics _brokerMetrics;
+
+  /**
+   * Initialize the store.
+   * @param config Configuration of the store.

Review Comment:
   thanks



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -46,13 +50,23 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
   private final BaseSingleStageBrokerRequestHandler 
_singleStageBrokerRequestHandler;
   private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
   private final TimeSeriesRequestHandler _timeSeriesRequestHandler;
+  private final AbstractResponseStore _resultStore;
+  private final String _brokerHost;
+  private final int _brokerPort;
+  private final long _expirationIntervalInMs;
+
 
   public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler 
singleStageBrokerRequestHandler,
       @Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
-      @Nullable TimeSeriesRequestHandler timeSeriesRequestHandler) {
+      @Nullable TimeSeriesRequestHandler timeSeriesRequestHandler,
+      String brokerHost, int brokerPort, AbstractResponseStore resultStore, 
String expirationTime) {
     _singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
     _multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
     _timeSeriesRequestHandler = timeSeriesRequestHandler;
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _resultStore = resultStore;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);

Review Comment:
   I'm not conviced by the way expiration time and 
`CursorResponse.setExpirationTimeMs` is calculated here. IICU, in 
`BaseBrokerStarter` we read a property each time we receive a cursor query, 
then send the value as a string here and once we start the query and calculate 
get the result, we calculate the actual expiration time as (conceptually, not 
in that Java class) an Instant.
   
   Instead I would suggest to:
   - In `BaseBrokerStarter`, read the property and transform it into an actual 
object at startup. The performance here is not that important, but the easy to 
use it is. Imagine a case where the config is incorrectly set to be some not a 
period. Then in the current code we will silently start to then fail whenever 
we receive a cursor query. In my suggestion we would fail at startup time, 
which IMHO it is quite better to detect the misconfiguration.
   - The actual instant when the data should be expired could also be 
calculated in `BaseBrokerStarter`. The difference here is minor (whether it 
counts the time to execute the query or not) but simplifies the code 
significantly. This is probably some less important change compared to the one 
above.
   



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_TEMP_DIR = 
"/tmp/pinot/broker/result_store/tmp";
+  public static final String DEFAULT_DATA_DIR = 
"/tmp/pinot/broker/result_store/data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  Path _localTempDir;
+  URI _dataDir;
+  ResponseSerde _responseSerde;
+  String _fileExtension;
+
+  public static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  public static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(PinotConfiguration config, BrokerMetrics brokerMetrics, 
ResponseSerde responseSerde)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;

Review Comment:
   is _brokerMetrics actually used?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -46,13 +50,23 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
   private final BaseSingleStageBrokerRequestHandler 
_singleStageBrokerRequestHandler;

Review Comment:
   Originally, as explained in the javadoc, this class was designed to be a 
wrap on top of other request handlers. But now it is adding some logic that 
almost duplicate the size of the class.
   
   Have you considered applying a decorate pattern here? You coud create a 
`CursorBrokerRequestHandler` class that implements `BrokerRequestHandler` and 
delegates on another `BrokerRequestHandler` most of the logic but adds the 
cursor logic on top of it. I haven't gone into the details, but it looks like 
that is something we could apply here to reduce the complexity of this and the 
suggested new class, keeping the single responsibility principle



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";
+  private final ControllerConf _controllerConf;
+  private final Executor _executor;
+  private final PoolingHttpClientConnectionManager _connectionManager;
+
+  public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics, Executor executor,
+      PoolingHttpClientConnectionManager connectionManager) {
+    super("ResultStoreCleaner", getFrequencyInSeconds(config), 
getInitialDelayInSeconds(config),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+    _controllerConf = config;
+    _executor = executor;
+    _connectionManager = connectionManager;
+  }
+
+  private static long getInitialDelayInSeconds(ControllerConf config) {
+    long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+    String resultStoreCleanerTaskInitialDelay =
+        
config.getProperty(CommonConstants.CursorConfigs.RESULT_STORE_CLEANER_INITIAL_DELAY);
+    if (resultStoreCleanerTaskInitialDelay != null) {
+      initialDelay = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(resultStoreCleanerTaskInitialDelay),
+          TimeUnit.MILLISECONDS);
+    }
+    return initialDelay;
+  }
+
+  private static long getFrequencyInSeconds(ControllerConf config) {
+    long frequencyInSeconds = 0;
+    String resultStoreCleanerTaskPeriod =
+        
config.getProperty(CommonConstants.CursorConfigs.RESULT_STORE_CLEANER_FREQUENCY_PERIOD);
+    if (resultStoreCleanerTaskPeriod != null) {
+      frequencyInSeconds = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(resultStoreCleanerTaskPeriod),
+          TimeUnit.MILLISECONDS);
+    }
+
+    return frequencyInSeconds;
+  }
+
+  @Override
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
+    long cleanAtMs = System.currentTimeMillis();
+    String cleanAtMsStr = periodicTaskProperties.getProperty(CLEAN_AT_TIME);
+    if (cleanAtMsStr != null) {
+      cleanAtMs = Long.parseLong(cleanAtMsStr);
+    }
+    doClean(cleanAtMs);
+  }
+
+  public void doClean(long currentTime) {

Review Comment:
   nit: this method declares `brokerResponses` and `responses` variables. The 
first is the list of cursors open on each broker, which the method iterates to 
then ask each broker to delete it if needed. `responses` store the result of 
that second request to brokers.
   
   It may be subjective, but the names are not very clear. Both are responses 
from browsers. I would appreciate to look for better names, like `knownCursors` 
and `cleanResponses` or something like that.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";
+  private final ControllerConf _controllerConf;
+  private final Executor _executor;
+  private final PoolingHttpClientConnectionManager _connectionManager;
+
+  public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics, Executor executor,
+      PoolingHttpClientConnectionManager connectionManager) {
+    super("ResultStoreCleaner", getFrequencyInSeconds(config), 
getInitialDelayInSeconds(config),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+    _controllerConf = config;
+    _executor = executor;
+    _connectionManager = connectionManager;
+  }
+
+  private static long getInitialDelayInSeconds(ControllerConf config) {
+    long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+    String resultStoreCleanerTaskInitialDelay =
+        
config.getProperty(CommonConstants.CursorConfigs.RESULT_STORE_CLEANER_INITIAL_DELAY);
+    if (resultStoreCleanerTaskInitialDelay != null) {
+      initialDelay = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(resultStoreCleanerTaskInitialDelay),
+          TimeUnit.MILLISECONDS);
+    }
+    return initialDelay;
+  }
+
+  private static long getFrequencyInSeconds(ControllerConf config) {
+    long frequencyInSeconds = 0;
+    String resultStoreCleanerTaskPeriod =
+        
config.getProperty(CommonConstants.CursorConfigs.RESULT_STORE_CLEANER_FREQUENCY_PERIOD);
+    if (resultStoreCleanerTaskPeriod != null) {
+      frequencyInSeconds = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(resultStoreCleanerTaskPeriod),
+          TimeUnit.MILLISECONDS);
+    }
+
+    return frequencyInSeconds;
+  }
+
+  @Override
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
+    long cleanAtMs = System.currentTimeMillis();
+    String cleanAtMsStr = periodicTaskProperties.getProperty(CLEAN_AT_TIME);
+    if (cleanAtMsStr != null) {
+      cleanAtMs = Long.parseLong(cleanAtMsStr);
+    }
+    doClean(cleanAtMs);
+  }
+
+  public void doClean(long currentTime) {
+    List<InstanceConfig> brokerList = 
_pinotHelixResourceManager.getAllBrokerInstanceConfigs();
+    Map<String, InstanceInfo> brokers = brokerList.stream().collect(
+        Collectors.toMap(x -> getInstanceKey(x.getHostName(), x.getPort()),
+            x -> new InstanceInfo(x.getInstanceName(), x.getHostName(), 
Integer.parseInt(x.getPort()))));

Review Comment:
   nit: 
   
   I find this style of code difficult to read:
   - we are just using streams to create a map, maybe a for would be simpler.
   - The key mapper and the value mapper are two functions that are conceptally 
similar but their indentation is different. Could you move `x -> 
getInstanceKey(...)` to the next line so they are indented in the same way?
   - That `x` parameter is not very informative. Instead you can use `broker` 
or at least `b` to make it easier to see it is actually a broker.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";

Review Comment:
   These two constants are templates that are used to build the URI we are 
going to call to get/delete cursors. I don't think it is very useful to declare 
them as a constant 60-90 lines above the single place they are going to be 
used. Instead I would prefer to see the template directly where we use them, 
specially if they are going to be used only once.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java:
##########
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseSerde;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final String DEFAULT_TEMP_DIR = 
"/tmp/pinot/broker/result_store/tmp";
+  public static final String DEFAULT_DATA_DIR = 
"/tmp/pinot/broker/result_store/data";
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  Path _localTempDir;
+  URI _dataDir;
+  ResponseSerde _responseSerde;
+  String _fileExtension;
+
+  public static Path getTempPath(Path localTempDir, String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return localTempDir.resolve(filename.toString());
+  }
+
+  public static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(PinotConfiguration config, BrokerMetrics brokerMetrics, 
ResponseSerde responseSerde)
+      throws Exception {
+    _brokerMetrics = brokerMetrics;
+    _responseSerde = responseSerde;
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = Paths.get(config.getProperty(TEMP_DIR, DEFAULT_TEMP_DIR));
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = new URI(config.getProperty(DATA_DIR, DEFAULT_DATA_DIR));
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    return pinotFS.exists(queryDir);
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug(String.format("Found %d paths.", queryPaths.size()));
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug(String.format("Processing query path: %s", 
metadata.toString()));
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug(
+              String.format("Checking for query dir %s & metadata file: %s. 
Metadata file exists: %s", queryDir,
+                  metadataFile, metadataFileExists));
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            requestIdList.add(response.getRequestId());
+            LOGGER.debug("Added query store {}", queryDir);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error when processing {}", metadata, e);
+        }
+      }
+    }
+
+    return requestIdList;
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    if (pinotFS.exists(queryDir)) {
+      pinotFS.delete(queryDir, true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResponseFile = getTempPath(_localTempDir, "response", requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+    try {
+      _responseSerde.serialize(response, 
Files.newOutputStream(tempResponseFile));
+      pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);
+    } finally {
+      Files.delete(tempResponseFile);
+    }
+  }
+
+  @Override
+  protected long writeResultTable(String requestId, ResultTable resultTable)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResultTableFile = getTempPath(_localTempDir, "resultTable", 
requestId);
+    URI dataFile = combinePath(queryDir, 
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+    try {
+      _responseSerde.serialize(resultTable, 
Files.newOutputStream(tempResultTableFile));
+      pinotFS.copyFromLocalFile(tempResultTableFile.toFile(), dataFile);

Review Comment:
   nit: couldn't you just use `pinotFS.move`? In this specific case, that could 
be implemented very cheap in LocalPinotFS (although I can see it is not)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to