This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b69d094be Refactoring the upsert compaction related code (#12275)
7b69d094be is described below

commit 7b69d094be7bc410d4bf8fe15e4c30233421e300
Author: Seunghyun Lee <seungh...@startree.ai>
AuthorDate: Mon Jan 22 00:55:40 2024 -0800

    Refactoring the upsert compaction related code (#12275)
    
    * Refactoring the upsert compaction related code
    
    1. Fix the issue with fetching validDocId metadata for table with
       a large number of segments. (Added POST API with list of segments
       to be part of the request body)
    2. Added POST support for MultiHttpRequest to cover 1.
    3. Added GET /tables/<tableName>/validDocIdMetadata API on the controller
       for improving debuggability.
    
    * Addressing comments
---
 .../requesthandler/BaseBrokerRequestHandler.java   |   4 +-
 .../apache/pinot/common/http/MultiHttpRequest.java |  51 +++++--
 .../restlet/resources/ValidDocIdMetadataInfo.java  |  56 +++++++
 .../pinot/common/http/MultiHttpRequestTest.java    | 106 ++++++++++++--
 .../api/resources/PinotRunningQueryResource.java   |   2 +-
 .../api/resources/PinotTableRestletResource.java   |  37 +++++
 .../controller/util/CompletionServiceHelper.java   |  52 ++++++-
 .../util/ServerSegmentMetadataReader.java          | 161 ++++++++++++++++++---
 .../pinot/controller/util/TableMetadataReader.java |  30 +++-
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java |  41 ++++++
 .../UpsertCompactionTaskExecutor.java              | 141 +-----------------
 .../UpsertCompactionTaskGenerator.java             | 106 ++++----------
 .../UpsertCompactionTaskExecutorTest.java          |   7 +-
 .../UpsertCompactionTaskGeneratorTest.java         |  86 +++--------
 .../readers/CompactedPinotSegmentRecordReader.java | 106 ++++++++++++++
 .../pinot/server/api/resources/TablesResource.java |  57 ++++++--
 .../pinot/server/api/TablesResourceTest.java       |  22 +++
 17 files changed, 715 insertions(+), 350 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index b2da166510..cc640110fc 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -209,9 +209,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     // TODO: Use different global query id for OFFLINE and REALTIME table 
after releasing 0.12.0. See QueryIdUtils for
     //       details
     String globalQueryId = getGlobalQueryId(requestId);
-    List<String> serverUrls = new ArrayList<>();
+    List<Pair<String, String>> serverUrls = new ArrayList<>();
     for (ServerInstance serverInstance : queryServers._servers) {
-      serverUrls.add(String.format("%s/query/%s", 
serverInstance.getAdminEndpoint(), globalQueryId));
+      serverUrls.add(Pair.of(String.format("%s/query/%s", 
serverInstance.getAdminEndpoint(), globalQueryId), null));
     }
     LOGGER.debug("Cancelling the query: {} via server urls: {}", 
queryServers._query, serverUrls);
     CompletionService<MultiHttpRequestResponse> completionService =
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java 
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
index a28674e2ad..73efb2c598 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.common.http;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletionService;
@@ -26,11 +27,14 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.function.Function;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.client.HttpClients;
@@ -66,14 +70,29 @@ public class MultiHttpRequest {
    * @return instance of CompletionService. Completion service will provide
    *   results as they arrive. The order is NOT same as the order of URLs
    */
-  public CompletionService<MultiHttpRequestResponse> execute(List<String> urls,
+  public CompletionService<MultiHttpRequestResponse> executeGet(List<String> 
urls,
       @Nullable Map<String, String> requestHeaders, int timeoutMs) {
-    return execute(urls, requestHeaders, timeoutMs, "GET", HttpGet::new);
+    List<Pair<String, String>> urlsAndRequestBodies = new ArrayList<>();
+    urls.forEach(url -> urlsAndRequestBodies.add(Pair.of(url, "")));
+    return execute(urlsAndRequestBodies, requestHeaders, timeoutMs, "GET", 
HttpGet::new);
+  }
+
+  /**
+   * POST urls in parallel using the executor service.
+   * @param urlsAndRequestBodies absolute URLs to POST
+   * @param requestHeaders headers to set when making the request
+   * @param timeoutMs timeout in milliseconds for each POST request
+   * @return instance of CompletionService. Completion service will provide
+   *   results as they arrive. The order is NOT same as the order of URLs
+   */
+  public CompletionService<MultiHttpRequestResponse> 
executePost(List<Pair<String, String>> urlsAndRequestBodies,
+      @Nullable Map<String, String> requestHeaders, int timeoutMs) {
+    return execute(urlsAndRequestBodies, requestHeaders, timeoutMs, "POST", 
HttpPost::new);
   }
 
   /**
    * Execute certain http method on the urls in parallel using the executor 
service.
-   * @param urls absolute URLs to execute the http method
+   * @param urlsAndRequestBodies absolute URLs to execute the http method
    * @param requestHeaders headers to set when making the request
    * @param timeoutMs timeout in milliseconds for each http request
    * @param httpMethodName the name of the http method like GET, DELETE etc.
@@ -81,22 +100,28 @@ public class MultiHttpRequest {
    * @return instance of CompletionService. Completion service will provide
    *   results as they arrive. The order is NOT same as the order of URLs
    */
-  public <T extends HttpRequestBase> 
CompletionService<MultiHttpRequestResponse> execute(List<String> urls,
-      @Nullable Map<String, String> requestHeaders, int timeoutMs, String 
httpMethodName,
-      Function<String, T> httpRequestBaseSupplier) {
+  public <T extends HttpRequestBase> 
CompletionService<MultiHttpRequestResponse> execute(
+      List<Pair<String, String>> urlsAndRequestBodies, @Nullable Map<String, 
String> requestHeaders, int timeoutMs,
+      String httpMethodName, Function<String, T> httpRequestBaseSupplier) {
     // Create global request configuration
-    RequestConfig defaultRequestConfig = RequestConfig.custom()
-        .setConnectionRequestTimeout(timeoutMs)
-        .setSocketTimeout(timeoutMs).build(); // setting the socket
+    RequestConfig defaultRequestConfig =
+        
RequestConfig.custom().setConnectionRequestTimeout(timeoutMs).setSocketTimeout(timeoutMs)
+            .build(); // setting the socket
 
-    HttpClientBuilder httpClientBuilder = HttpClients.custom()
-        
.setConnectionManager(_connectionManager).setDefaultRequestConfig(defaultRequestConfig);
+    HttpClientBuilder httpClientBuilder =
+        
HttpClients.custom().setConnectionManager(_connectionManager).setDefaultRequestConfig(defaultRequestConfig);
 
     CompletionService<MultiHttpRequestResponse> completionService = new 
ExecutorCompletionService<>(_executor);
     CloseableHttpClient client = httpClientBuilder.build();
-    for (String url : urls) {
+    for (Pair<String, String> pair : urlsAndRequestBodies) {
       completionService.submit(() -> {
-        T httpMethod = httpRequestBaseSupplier.apply(url);
+        String url = pair.getLeft();
+        String body = pair.getRight();
+        HttpRequestBase httpMethod = httpRequestBaseSupplier.apply(url);
+        // If the http method is POST, set the request body
+        if (httpMethod instanceof HttpPost) {
+          ((HttpPost) httpMethod).setEntity(new StringEntity(body));
+        }
         if (requestHeaders != null) {
           requestHeaders.forEach(((HttpRequestBase) httpMethod)::setHeader);
         }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
new file mode 100644
index 0000000000..475ba91432
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ValidDocIdMetadataInfo {
+  private final String _segmentName;
+  private final long _totalValidDocs;
+  private final long _totalInvalidDocs;
+  private final long _totalDocs;
+
+  public ValidDocIdMetadataInfo(@JsonProperty("segmentName") String 
segmentName,
+      @JsonProperty("totalValidDocs") long totalValidDocs, 
@JsonProperty("totalInvalidDocs") long totalInvalidDocs,
+      @JsonProperty("totalDocs") long totalDocs) {
+    _segmentName = segmentName;
+    _totalValidDocs = totalValidDocs;
+    _totalInvalidDocs = totalInvalidDocs;
+    _totalDocs = totalDocs;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  public long getTotalValidDocs() {
+    return _totalValidDocs;
+  }
+
+  public long getTotalInvalidDocs() {
+    return _totalInvalidDocs;
+  }
+
+  public long getTotalDocs() {
+    return _totalDocs;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
index 61f58f78f6..292e2ec82e 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
@@ -43,6 +44,30 @@ import org.testng.annotations.Test;
 
 
 public class MultiHttpRequestTest {
+  class TestResult {
+    private final int _success;
+    private final int _errors;
+    private final int _timeouts;
+
+    public TestResult(int success, int errors, int timeouts) {
+      _success = success;
+      _errors = errors;
+      _timeouts = timeouts;
+    }
+
+    public int getSuccess() {
+      return _success;
+    }
+
+    public int getErrors() {
+      return _errors;
+    }
+
+    public int getTimeouts() {
+      return _timeouts;
+    }
+  }
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiHttpRequest.class);
   private static final String SUCCESS_MSG = "success";
   private static final String ERROR_MSG = "error";
@@ -61,6 +86,7 @@ public class MultiHttpRequestTest {
     startServer(_portStart, createHandler(SUCCESS_CODE, SUCCESS_MSG, 0));
     startServer(_portStart + 1, createHandler(ERROR_CODE, ERROR_MSG, 0));
     startServer(_portStart + 2, createHandler(SUCCESS_CODE, TIMEOUT_MSG, 
TIMEOUT_MS));
+    startServer(_portStart + 3, createPostHandler(SUCCESS_CODE, SUCCESS_MSG, 
0));
   }
 
   @AfterTest
@@ -90,6 +116,33 @@ public class MultiHttpRequestTest {
     };
   }
 
+  private HttpHandler createPostHandler(final int status, final String msg, 
final int sleepTimeMs) {
+    return new HttpHandler() {
+      @Override
+      public void handle(HttpExchange httpExchange)
+          throws IOException {
+        if (sleepTimeMs > 0) {
+          try {
+            Thread.sleep(sleepTimeMs);
+          } catch (InterruptedException e) {
+            LOGGER.info("Handler interrupted during sleep");
+          }
+        }
+        if (httpExchange.getRequestMethod().equals("POST")) {
+          httpExchange.sendResponseHeaders(status, msg.length());
+          OutputStream responseBody = httpExchange.getResponseBody();
+          responseBody.write(msg.getBytes());
+          responseBody.close();
+        } else {
+          httpExchange.sendResponseHeaders(ERROR_CODE, ERROR_MSG.length());
+          OutputStream responseBody = httpExchange.getResponseBody();
+          responseBody.write(ERROR_MSG.getBytes());
+          responseBody.close();
+        }
+      }
+    };
+  }
+
   private void startServer(int port, HttpHandler handler)
       throws IOException {
     final HttpServer server = HttpServer.create(new InetSocketAddress(port), 
0);
@@ -104,22 +157,58 @@ public class MultiHttpRequestTest {
   }
 
   @Test
-  public void testMultiGet() throws Exception {
-    MultiHttpRequest mget =
-        new MultiHttpRequest(Executors.newCachedThreadPool(), new 
PoolingHttpClientConnectionManager());
+  public void testMultiGet() {
     List<String> urls = Arrays.asList("http://localhost:"; + 
String.valueOf(_portStart) + URI_PATH,
         "http://localhost:"; + String.valueOf(_portStart + 1) + URI_PATH,
         "http://localhost:"; + String.valueOf(_portStart + 2) + URI_PATH,
         // 2nd request to the same server
-        "http://localhost:"; + String.valueOf(_portStart) + URI_PATH);
+        "http://localhost:"; + String.valueOf(_portStart) + URI_PATH,
+        "http://localhost:"; + String.valueOf(_portStart + 3) + URI_PATH);
+
+    MultiHttpRequest mget =
+        new MultiHttpRequest(Executors.newCachedThreadPool(), new 
PoolingHttpClientConnectionManager());
+
     // timeout value needs to be less than 5000ms set above for
     // third server
     final int requestTimeoutMs = 1000;
-    CompletionService<MultiHttpRequestResponse> completionService = 
mget.execute(urls, null, requestTimeoutMs);
+    CompletionService<MultiHttpRequestResponse> completionService = 
mget.executeGet(urls, null, requestTimeoutMs);
+
+    TestResult result = collectResult(completionService, urls.size());
+    Assert.assertEquals(result.getSuccess(), 2);
+    Assert.assertEquals(result.getErrors(), 2);
+    Assert.assertEquals(result.getTimeouts(), 1);
+  }
+
+  @Test
+  public void testMultiPost() {
+    List<Pair<String, String>> urlsAndRequestBodies =
+        List.of(Pair.of("http://localhost:"; + String.valueOf(_portStart) + 
URI_PATH, "b0"),
+            Pair.of("http://localhost:"; + String.valueOf(_portStart + 1) + 
URI_PATH, "b1"),
+            Pair.of("http://localhost:"; + String.valueOf(_portStart + 2) + 
URI_PATH, "b2"),
+            // 2nd request to the same server
+            Pair.of("http://localhost:"; + String.valueOf(_portStart) + 
URI_PATH, "b3"),
+            Pair.of("http://localhost:"; + String.valueOf(_portStart + 3) + 
URI_PATH, "b4"));
+
+    MultiHttpRequest mpost =
+        new MultiHttpRequest(Executors.newCachedThreadPool(), new 
PoolingHttpClientConnectionManager());
+
+    // timeout value needs to be less than 5000ms set above for
+    // third server
+    final int requestTimeoutMs = 1000;
+    CompletionService<MultiHttpRequestResponse> completionService =
+        mpost.executePost(urlsAndRequestBodies, null, requestTimeoutMs);
+
+    TestResult result = collectResult(completionService, 
urlsAndRequestBodies.size());
+    Assert.assertEquals(result.getSuccess(), 3);
+    Assert.assertEquals(result.getErrors(), 1);
+    Assert.assertEquals(result.getTimeouts(), 1);
+  }
+
+  private TestResult collectResult(CompletionService<MultiHttpRequestResponse> 
completionService, int size) {
     int success = 0;
     int errors = 0;
     int timeouts = 0;
-    for (int i = 0; i < urls.size(); i++) {
+    for (int i = 0; i < size; i++) {
       try (MultiHttpRequestResponse httpRequestResponse = 
completionService.take().get()) {
         if (httpRequestResponse.getResponse().getStatusLine().getStatusCode() 
>= 300) {
           errors++;
@@ -143,9 +232,6 @@ public class MultiHttpRequestTest {
         errors++;
       }
     }
-
-    Assert.assertEquals(success, 2);
-    Assert.assertEquals(errors, 1);
-    Assert.assertEquals(timeouts, 1);
+    return new TestResult(success, errors, timeouts);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java
index 8aa004b35b..29bdccf4eb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java
@@ -187,7 +187,7 @@ public class PinotRunningQueryResource {
     }
     LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls);
     CompletionService<MultiHttpRequestResponse> completionService =
-        new MultiHttpRequest(_executor, _httpConnMgr).execute(brokerUrls, 
requestHeaders, timeoutMs);
+        new MultiHttpRequest(_executor, _httpConnMgr).executeGet(brokerUrls, 
requestHeaders, timeoutMs);
     Map<String, Map<String, String>> queriesByBroker = new HashMap<>();
     List<String> errMsgs = new ArrayList<>(brokerUrls.size());
     for (int i = 0; i < brokerUrls.size(); i++) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 5d35e1f0c3..d7894c08f7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -950,6 +950,43 @@ public class PinotTableRestletResource {
     return segmentsMetadata;
   }
 
+  @GET
+  @Path("tables/{tableName}/validDocIdMetadata")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the aggregate valid doc id metadata of all 
segments for a table", notes = "Get the "
+      + "aggregate valid doc id metadata of all segments for a table")
+  public String getTableAggregateValidDocIdMetadata(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      @ApiParam(value = "A list of segments", allowMultiple = true) 
@QueryParam("segmentNames")
+      List<String> segmentNames) {
+    LOGGER.info("Received a request to fetch aggregate valid doc id metadata 
for a table {}", tableName);
+    TableType tableType = Constants.validateTableType(tableTypeStr);
+    if (tableType == TableType.OFFLINE) {
+      throw new ControllerApplicationException(LOGGER, "Table type : " + 
tableTypeStr + " not yet supported.",
+          Response.Status.NOT_IMPLEMENTED);
+    }
+    String tableNameWithType =
+        
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableType, LOGGER).get(0);
+
+    String validDocIdMetadata;
+    try {
+      TableMetadataReader tableMetadataReader =
+          new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+      JsonNode segmentsMetadataJson =
+          
tableMetadataReader.getAggregateValidDocIdMetadata(tableNameWithType, 
segmentNames,
+              _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      validDocIdMetadata = 
JsonUtils.objectToPrettyString(segmentsMetadataJson);
+    } catch (InvalidConfigException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST);
+    } catch (IOException ioe) {
+      throw new ControllerApplicationException(LOGGER, "Error parsing Pinot 
server response: " + ioe.getMessage(),
+          Response.Status.INTERNAL_SERVER_ERROR, ioe);
+    }
+    return validDocIdMetadata;
+  }
+
   @GET
   @Path("tables/{tableName}/indexes")
   @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_METADATA)
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
index d26f5ba603..f0b36b2cfa 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.http.util.EntityUtils;
 import org.apache.pinot.common.http.MultiHttpRequest;
@@ -79,12 +80,46 @@ public class CompletionServiceHelper {
   public CompletionServiceResponse doMultiGetRequest(List<String> serverURLs, 
String tableNameWithType,
       boolean multiRequestPerServer, @Nullable Map<String, String> 
requestHeaders, int timeoutMs,
       @Nullable String useCase) {
-    CompletionServiceResponse completionServiceResponse = new 
CompletionServiceResponse();
-
     // TODO: use some service other than completion service so that we know 
which server encounters the error
     CompletionService<MultiHttpRequestResponse> completionService =
-        new MultiHttpRequest(_executor, 
_httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs);
-    for (int i = 0; i < serverURLs.size(); i++) {
+        new MultiHttpRequest(_executor, 
_httpConnectionManager).executeGet(serverURLs, requestHeaders, timeoutMs);
+
+    return collectResponse(tableNameWithType, serverURLs.size(), 
completionService, multiRequestPerServer, useCase);
+  }
+
+  /**
+   * This method makes a MultiPost call to all given URLs and its 
corresponding bodies.
+   * @param serverURLsAndRequestBodies server urls to send GET request.
+   * @param tableNameWithType table name with type suffix
+   * @param multiRequestPerServer it's possible that need to send multiple 
requests to a same server.
+   *                              If multiRequestPerServer is set as false, 
return as long as one of the requests get
+   *                              response; If multiRequestPerServer is set as 
true, wait until all requests
+   *                              get response.
+   * @param requestHeaders Headers to be set when making the http calls.
+   * @param timeoutMs timeout in milliseconds to wait per request.
+   * @param useCase the use case initiating the multi-get request. If not null 
and an exception is thrown, only the
+   *                error message and the use case are logged instead of the 
full stack trace.
+   * @return CompletionServiceResponse Map of the endpoint(server instance, or 
full request path if
+   * multiRequestPerServer is true) to the response from that endpoint.
+   */
+  public CompletionServiceResponse doMultiPostRequest(List<Pair<String, 
String>> serverURLsAndRequestBodies,
+      String tableNameWithType, boolean multiRequestPerServer, @Nullable 
Map<String, String> requestHeaders,
+      int timeoutMs, @Nullable String useCase) {
+
+    CompletionService<MultiHttpRequestResponse> completionService =
+        new MultiHttpRequest(_executor, 
_httpConnectionManager).executePost(serverURLsAndRequestBodies, requestHeaders,
+            timeoutMs);
+
+    return collectResponse(tableNameWithType, 
serverURLsAndRequestBodies.size(), completionService,
+        multiRequestPerServer, useCase);
+  }
+
+  private CompletionServiceResponse collectResponse(String tableNameWithType, 
int size,
+      CompletionService<MultiHttpRequestResponse> completionService, boolean 
multiRequestPerServer,
+      @Nullable String useCase) {
+    CompletionServiceResponse completionServiceResponse = new 
CompletionServiceResponse();
+
+    for (int i = 0; i < size; i++) {
       MultiHttpRequestResponse multiHttpRequestResponse = null;
       try {
         multiHttpRequestResponse = completionService.take().get();
@@ -93,7 +128,8 @@ public class CompletionServiceHelper {
             _endpointsToServers.get(String.format("%s://%s:%d", 
uri.getScheme(), uri.getHost(), uri.getPort()));
         int statusCode = 
multiHttpRequestResponse.getResponse().getStatusLine().getStatusCode();
         if (statusCode >= 300) {
-          LOGGER.error("Server: {} returned error: {}", instance, statusCode);
+          String reason = 
multiHttpRequestResponse.getResponse().getStatusLine().getReasonPhrase();
+          LOGGER.error("Server: {} returned error: {}, reason: {}", instance, 
statusCode, reason);
           completionServiceResponse._failedResponseCount++;
           continue;
         }
@@ -102,7 +138,7 @@ public class CompletionServiceHelper {
             .put(multiRequestPerServer ? uri.toString() : instance, 
responseString);
       } catch (Exception e) {
         String reason = useCase == null ? "" : String.format(" in '%s'", 
useCase);
-        LOGGER.error("Connection error{}. Details: {}", reason, 
e.getMessage());
+        LOGGER.error("Connection error {}. Details: {}", reason, 
e.getMessage());
         completionServiceResponse._failedResponseCount++;
       } finally {
         if (multiHttpRequestResponse != null) {
@@ -116,9 +152,9 @@ public class CompletionServiceHelper {
     }
 
     int numServersResponded = completionServiceResponse._httpResponses.size();
-    if (numServersResponded != serverURLs.size()) {
+    if (numServersResponded != size) {
       LOGGER.warn("Finished reading information for table: {} with {}/{} 
server responses", tableNameWithType,
-          numServersResponded, serverURLs.size());
+          numServersResponded, size);
     } else {
       LOGGER.info("Finished reading information for table: {}", 
tableNameWithType);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index a51881d9e3..de1cae193c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -18,19 +18,35 @@
  */
 package org.apache.pinot.controller.util;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
+import org.apache.pinot.common.restlet.resources.TableSegments;
+import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +63,11 @@ public class ServerSegmentMetadataReader {
   private final Executor _executor;
   private final HttpClientConnectionManager _connectionManager;
 
+  public ServerSegmentMetadataReader() {
+    _executor = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    _connectionManager = new PoolingHttpClientConnectionManager();
+  }
+
   public ServerSegmentMetadataReader(Executor executor, 
HttpClientConnectionManager connectionManager) {
     _executor = executor;
     _connectionManager = connectionManager;
@@ -100,11 +121,11 @@ public class ServerSegmentMetadataReader {
         tableMetadataInfo.getColumnCardinalityMap().forEach((k, v) -> 
columnCardinalityMap.merge(k, v, Double::sum));
         tableMetadataInfo.getMaxNumMultiValuesMap().forEach((k, v) -> 
maxNumMultiValuesMap.merge(k, v, Double::sum));
         tableMetadataInfo.getColumnIndexSizeMap().forEach((k, v) -> 
columnIndexSizeMap.merge(k, v, (l, r) -> {
-            for (Map.Entry<String, Double> e : r.entrySet()) {
-              l.put(e.getKey(), l.getOrDefault(e.getKey(), 0d) + e.getValue());
-            }
-            return l;
-          }));
+          for (Map.Entry<String, Double> e : r.entrySet()) {
+            l.put(e.getKey(), l.getOrDefault(e.getKey(), 0d) + e.getValue());
+          }
+          return l;
+        }));
       } catch (IOException e) {
         failedParses++;
         LOGGER.error("Unable to parse server {} response due to an error: ", 
streamResponse.getKey(), e);
@@ -180,27 +201,129 @@ public class ServerSegmentMetadataReader {
     return segmentsMetadata;
   }
 
+  /**
+   * This method is called when the API request is to fetch validDocId 
metadata for a list segments of the given table.
+   * This method will pick a server that hosts the target segment and fetch 
the segment metadata result.
+   *
+   * @return segment metadata as a JSON string
+   */
+  public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String 
tableNameWithType,
+      Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> 
serverToEndpoints,
+      @Nullable List<String> segmentNames, int timeoutMs) {
+    List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serverToSegmentsMap.entrySet()) {
+      List<String> segmentsForServer = serverToSegments.getValue();
+      List<String> segmentsToQuery = new ArrayList<>();
+      if (segmentNames == null || segmentNames.isEmpty()) {
+        segmentsToQuery.addAll(segmentsForServer);
+      } else {
+        Set<String> segmentNamesLookUpTable = new HashSet<>(segmentNames);
+        for (String segment : segmentsForServer) {
+          if (segmentNamesLookUpTable.contains(segment)) {
+            segmentsToQuery.add(segment);
+          }
+        }
+      }
+      serverURLsAndBodies.add(generateValidDocIdMetadataURL(tableNameWithType, 
segmentsToQuery,
+          serverToEndpoints.get(serverToSegments.getKey())));
+    }
+
+    // request the urls from the servers
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverToEndpoints);
+
+    Map<String, String> requestHeaders = Map.of("Content-Type", 
"application/json");
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, 
tableNameWithType, false, requestHeaders,
+            timeoutMs, null);
+
+    List<ValidDocIdMetadataInfo> validDocIdMetadataInfos = new ArrayList<>();
+    int failedParses = 0;
+    int returnedSegmentsCount = 0;
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      try {
+        String validDocIdMetadataList = streamResponse.getValue();
+        List<ValidDocIdMetadataInfo> validDocIdMetadataInfo =
+            JsonUtils.stringToObject(validDocIdMetadataList, new 
TypeReference<ArrayList<ValidDocIdMetadataInfo>>() {
+            });
+        validDocIdMetadataInfos.addAll(validDocIdMetadataInfo);
+        returnedSegmentsCount++;
+      } catch (Exception e) {
+        failedParses++;
+        LOGGER.error("Unable to parse server {} response due to an error: ", 
streamResponse.getKey(), e);
+      }
+    }
+    if (failedParses != 0) {
+      LOGGER.error("Unable to parse server {} / {} response due to an error: 
", failedParses,
+          serverURLsAndBodies.size());
+    }
+
+    if (segmentNames != null && returnedSegmentsCount != segmentNames.size()) {
+      LOGGER.error("Unable to get validDocIdMetadata from all servers. 
Expected: {}, Actual: {}", segmentNames.size(),
+          returnedSegmentsCount);
+    }
+    LOGGER.info("Retrieved valid doc id metadata for {} segments from {} 
servers.", returnedSegmentsCount,
+        serverURLsAndBodies.size());
+    return validDocIdMetadataInfos;
+  }
+
+  /**
+   * This method is called when the API request is to fetch validDocIds for a 
segment of the given table. This method
+   * will pick a server that hosts the target segment and fetch the 
validDocIds result.
+   *
+   * @return a bitmap of validDocIds
+   */
+  public RoaringBitmap getValidDocIdsFromServer(String tableNameWithType, 
String segmentName, String endpoint,
+      int timeoutMs) {
+    // Build the endpoint url
+    String url = generateValidDocIdsURL(tableNameWithType, segmentName, 
endpoint);
+
+    // Set timeout
+    ClientConfig clientConfig = new ClientConfig();
+    clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMs);
+    clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMs);
+
+    Response response = 
ClientBuilder.newClient(clientConfig).target(url).request().get(Response.class);
+    Preconditions.checkState(response.getStatus() == 
Response.Status.OK.getStatusCode(),
+        "Unable to retrieve validDocIds from %s", url);
+    byte[] validDocIds = response.readEntity(byte[].class);
+    return RoaringBitmapUtils.deserialize(validDocIds);
+  }
+
   private String generateAggregateSegmentMetadataServerURL(String 
tableNameWithType, List<String> columns,
       String endpoint) {
-    try {
-      tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8.name());
-      String paramsStr = generateColumnsParam(columns);
-      return String.format("%s/tables/%s/metadata?%s", endpoint, 
tableNameWithType, paramsStr);
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e.getCause());
-    }
+    tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
+    String paramsStr = generateColumnsParam(columns);
+    return String.format("%s/tables/%s/metadata?%s", endpoint, 
tableNameWithType, paramsStr);
   }
 
   private String generateSegmentMetadataServerURL(String tableNameWithType, 
String segmentName, List<String> columns,
       String endpoint) {
+    tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
+    segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
+    String paramsStr = generateColumnsParam(columns);
+    return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, 
tableNameWithType, segmentName, paramsStr);
+  }
+
+  private String generateValidDocIdsURL(String tableNameWithType, String 
segmentName, String endpoint) {
+    tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
+    segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
+    return String.format("%s/segments/%s/%s/validDocIds", endpoint, 
tableNameWithType, segmentName);
+  }
+
+  private Pair<String, String> generateValidDocIdMetadataURL(String 
tableNameWithType, List<String> segmentNames,
+      String endpoint) {
+    tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
+    TableSegments tableSegments = new TableSegments(segmentNames);
+    String jsonTableSegments;
     try {
-      tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8.name());
-      segmentName = URLEncoder.encode(segmentName, 
StandardCharsets.UTF_8.name());
-      String paramsStr = generateColumnsParam(columns);
-      return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, 
tableNameWithType, segmentName, paramsStr);
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e.getCause());
+      jsonTableSegments = JsonUtils.objectToString(tableSegments);
+    } catch (JsonProcessingException e) {
+      LOGGER.error("Failed to convert segment names to json request body: 
segmentNames={}", segmentNames);
+      throw new RuntimeException(e);
     }
+    return Pair.of(
+        String.format("%s/tables/%s/validDocIdMetadata", endpoint, 
tableNameWithType), jsonTableSegments);
   }
 
   private String generateColumnsParam(List<String> columns) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index 7ef5302c1b..f92747b497 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
+import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.spi.utils.JsonUtils;
 
@@ -81,8 +82,9 @@ public class TableMetadataReader {
       }
     }
 
-    List<String> segmentsMetadata = serverSegmentMetadataReader
-        .getSegmentMetadataFromServer(tableNameWithType, serverToSegmentsMap, 
endpoints, columns, timeoutMs);
+    List<String> segmentsMetadata =
+        
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType, 
serverToSegmentsMap, endpoints,
+            columns, timeoutMs);
     Map<String, JsonNode> response = new HashMap<>();
     for (String segmentMetadata : segmentsMetadata) {
       JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
@@ -146,8 +148,28 @@ public class TableMetadataReader {
     ServerSegmentMetadataReader serverSegmentMetadataReader =
         new ServerSegmentMetadataReader(_executor, _connectionManager);
 
-    TableMetadataInfo aggregateTableMetadataInfo = serverSegmentMetadataReader
-        .getAggregatedTableMetadataFromServer(tableNameWithType, endpoints, 
columns, numReplica, timeoutMs);
+    TableMetadataInfo aggregateTableMetadataInfo =
+        
serverSegmentMetadataReader.getAggregatedTableMetadataFromServer(tableNameWithType,
 endpoints, columns,
+            numReplica, timeoutMs);
+    return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
+  }
+
+  /**
+   * This method retrieves the aggregated valid doc id metadata for a given 
table.
+   * @return a list of ValidDocIdMetadataInfo
+   */
+  public JsonNode getAggregateValidDocIdMetadata(String tableNameWithType, 
List<String> segmentNames, int timeoutMs)
+      throws InvalidConfigException {
+    final Map<String, List<String>> serverToSegments =
+        _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    BiMap<String, String> endpoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    ServerSegmentMetadataReader serverSegmentMetadataReader =
+        new ServerSegmentMetadataReader(_executor, _connectionManager);
+
+    List<ValidDocIdMetadataInfo> aggregateTableMetadataInfo =
+        
serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType, 
serverToSegments, endpoints,
+            segmentNames, timeoutMs);
     return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
   }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index a46f903ed9..31f5d6039c 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -21,15 +21,23 @@ package org.apache.pinot.plugin.minion.tasks;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
+import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,4 +138,37 @@ public class MinionTaskUtils {
     }
     return dirInStr;
   }
+
+  public static RoaringBitmap getValidDocIds(String tableNameWithType, String 
segmentName, Map<String, String> configs,
+      MinionContext minionContext) {
+    HelixAdmin helixAdmin = 
minionContext.getHelixManager().getClusterManagmentTool();
+    String clusterName = minionContext.getHelixManager().getClusterName();
+
+    String server = getServer(segmentName, tableNameWithType, helixAdmin, 
clusterName);
+    InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, 
server);
+    String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
+
+    // We only need aggregated table size and the total number of docs/rows. 
Skipping column related stats, by
+    // passing an empty list.
+    ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
+    return 
serverSegmentMetadataReader.getValidDocIdsFromServer(tableNameWithType, 
segmentName, endpoint, 60_000);
+  }
+
+  public static String getServer(String segmentName, String tableNameWithType, 
HelixAdmin helixAdmin,
+      String clusterName) {
+    ExternalView externalView = 
helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
+    if (externalView == null) {
+      throw new IllegalStateException("External view does not exist for table: 
" + tableNameWithType);
+    }
+    Map<String, String> instanceStateMap = 
externalView.getStateMap(segmentName);
+    if (instanceStateMap == null) {
+      throw new IllegalStateException("Failed to find segment: " + 
segmentName);
+    }
+    for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
+      if 
(entry.getValue().equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE))
 {
+        return entry.getKey();
+      }
+    }
+    throw new IllegalStateException("Failed to find ONLINE server for segment: 
" + segmentName);
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index aa37ac871a..4c200b9606 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -18,118 +18,28 @@
  */
 package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.core.Response;
 import org.apache.commons.io.FileUtils;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixManager;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.http.client.utils.URIBuilder;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
-import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import 
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import 
org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.roaringbitmap.PeekableIntIterator;
-import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class UpsertCompactionTaskExecutor extends 
BaseSingleSegmentConversionExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);
-  private static HelixManager _helixManager = MINION_CONTEXT.getHelixManager();
-  private static HelixAdmin _clusterManagementTool = 
_helixManager.getClusterManagmentTool();
-  private static String _clusterName = _helixManager.getClusterName();
-
-  private class CompactedRecordReader implements RecordReader {
-    private final PinotSegmentRecordReader _pinotSegmentRecordReader;
-    private final PeekableIntIterator _validDocIdsIterator;
-    // Reusable generic row to store the next row to return
-    GenericRow _nextRow = new GenericRow();
-    // Flag to mark whether we need to fetch another row
-    boolean _nextRowReturned = true;
-
-    CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) {
-      _pinotSegmentRecordReader = new PinotSegmentRecordReader();
-      _pinotSegmentRecordReader.init(indexDir, null, null);
-      _validDocIdsIterator = validDocIds.getIntIterator();
-    }
-
-    @Override
-    public void init(File dataFile, Set<String> fieldsToRead, @Nullable 
RecordReaderConfig recordReaderConfig) {
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (!_validDocIdsIterator.hasNext() && _nextRowReturned) {
-        return false;
-      }
-
-      // If next row has not been returned, return true
-      if (!_nextRowReturned) {
-        return true;
-      }
-
-      // Try to get the next row to return
-      if (_validDocIdsIterator.hasNext()) {
-        int docId = _validDocIdsIterator.next();
-        _nextRow.clear();
-        _pinotSegmentRecordReader.getRecord(docId, _nextRow);
-        _nextRowReturned = false;
-        return true;
-      }
-
-      // Cannot find next row to return, return false
-      return false;
-    }
-
-    @Override
-    public GenericRow next() {
-      return next(new GenericRow());
-    }
-
-    @Override
-    public GenericRow next(GenericRow reuse) {
-      Preconditions.checkState(!_nextRowReturned);
-      reuse.init(_nextRow);
-      _nextRowReturned = true;
-      return reuse;
-    }
-
-    @Override
-    public void rewind() {
-      _pinotSegmentRecordReader.rewind();
-      _nextRowReturned = true;
-    }
-
-    @Override
-    public void close()
-        throws IOException {
-      _pinotSegmentRecordReader.close();
-    }
-  }
 
   @Override
   protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, 
File indexDir, File workingDir)
@@ -143,7 +53,7 @@ public class UpsertCompactionTaskExecutor extends 
BaseSingleSegmentConversionExe
 
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     TableConfig tableConfig = getTableConfig(tableNameWithType);
-    ImmutableRoaringBitmap validDocIds = getValidDocIds(tableNameWithType, 
configs);
+    RoaringBitmap validDocIds = 
MinionTaskUtils.getValidDocIds(tableNameWithType, segmentName, configs, 
MINION_CONTEXT);
 
     if (validDocIds.isEmpty()) {
       // prevents empty segment generation
@@ -159,7 +69,8 @@ public class UpsertCompactionTaskExecutor extends 
BaseSingleSegmentConversionExe
     }
 
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
-    try (CompactedRecordReader compactedRecordReader = new 
CompactedRecordReader(indexDir, validDocIds)) {
+    try (CompactedPinotSegmentRecordReader compactedRecordReader = new 
CompactedPinotSegmentRecordReader(indexDir,
+        validDocIds)) {
       SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, 
tableConfig, segmentMetadata, segmentName);
       SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
       driver.init(config, compactedRecordReader);
@@ -198,46 +109,6 @@ public class UpsertCompactionTaskExecutor extends 
BaseSingleSegmentConversionExe
     return config;
   }
 
-  // TODO: Consider moving this method to a more appropriate class (eg 
ServerSegmentMetadataReader)
-  private static ImmutableRoaringBitmap getValidDocIds(String 
tableNameWithType, Map<String, String> configs)
-      throws URISyntaxException {
-    String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
-    String server = getServer(segmentName, tableNameWithType);
-
-    // get the url for the validDocIds for the server
-    InstanceConfig instanceConfig = 
_clusterManagementTool.getInstanceConfig(_clusterName, server);
-    String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
-    String url =
-        new 
URIBuilder(endpoint).setPath(String.format("/segments/%s/%s/validDocIds", 
tableNameWithType, segmentName))
-            .toString();
-
-    // get the validDocIds from that server
-    Response response = 
ClientBuilder.newClient().target(url).request().get(Response.class);
-    Preconditions.checkState(response.getStatus() == 
Response.Status.OK.getStatusCode(),
-        "Unable to retrieve validDocIds from %s", url);
-    byte[] snapshot = response.readEntity(byte[].class);
-    ImmutableRoaringBitmap validDocIds = new 
ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot));
-    return validDocIds;
-  }
-
-  @VisibleForTesting
-  public static String getServer(String segmentName, String tableNameWithType) 
{
-    ExternalView externalView = 
_clusterManagementTool.getResourceExternalView(_clusterName, tableNameWithType);
-    if (externalView == null) {
-      throw new IllegalStateException("External view does not exist for table: 
" + tableNameWithType);
-    }
-    Map<String, String> instanceStateMap = 
externalView.getStateMap(segmentName);
-    if (instanceStateMap == null) {
-      throw new IllegalStateException("Failed to find segment: " + 
segmentName);
-    }
-    for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
-      if (entry.getValue().equals(SegmentStateModel.ONLINE)) {
-        return entry.getKey();
-      }
-    }
-    throw new IllegalStateException("Failed to find ONLINE server for segment: 
" + segmentName);
-  }
-
   @Override
   protected SegmentZKMetadataCustomMapModifier 
getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
       SegmentConversionResult segmentConversionResult) {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 590102319e..e6eaf3679e 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -18,26 +18,20 @@
  */
 package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BiMap;
-import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import org.apache.http.client.utils.URIBuilder;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
-import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -45,7 +39,6 @@ import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,6 +82,7 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
     for (TableConfig tableConfig : tableConfigs) {
       if (!validate(tableConfig)) {
+        LOGGER.warn("Validation failed for table {}. Skipping..", 
tableConfig.getTableName());
         continue;
       }
 
@@ -103,6 +97,8 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
         continue;
       }
 
+      // TODO: add a check to see if the task is already running for the table
+
       // get server to segment mappings
       PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
       Map<String, List<String>> serverToSegments = 
pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
@@ -113,27 +109,21 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
         throw new RuntimeException(e);
       }
 
-      Map<String, SegmentZKMetadata> completedSegmentsMap =
-          
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
 Function.identity()));
+      ServerSegmentMetadataReader serverSegmentMetadataReader =
+          new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(),
+              _clusterInfoAccessor.getConnectionManager());
 
-      List<String> validDocIdUrls;
-      try {
-        validDocIdUrls = getValidDocIdMetadataUrls(serverToSegments, 
serverToEndpoints, tableNameWithType,
-            completedSegmentsMap.keySet());
-      } catch (URISyntaxException e) {
-        throw new RuntimeException(e);
-      }
+      // TODO: currently, we put segmentNames=null to get metadata for all 
segments. We can change this to get
+      // valid doc id metadata in batches with the loop.
+      List<ValidDocIdMetadataInfo> validDocIdMetadataList =
+          
serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType, 
serverToSegments,
+              serverToEndpoints, null, 60_000);
 
-      // request the urls from the servers
-      CompletionServiceHelper completionServiceHelper =
-          new CompletionServiceHelper(_clusterInfoAccessor.getExecutor(), 
_clusterInfoAccessor.getConnectionManager(),
-              serverToEndpoints.inverse());
-
-      CompletionServiceHelper.CompletionServiceResponse serviceResponse =
-          completionServiceHelper.doMultiGetRequest(validDocIdUrls, 
tableNameWithType, false, 3000);
+      Map<String, SegmentZKMetadata> completedSegmentsMap =
+          
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
 Function.identity()));
 
       SegmentSelectionResult segmentSelectionResult =
-          processValidDocIdMetadata(taskConfigs, completedSegmentsMap, 
serviceResponse._httpResponses.entrySet());
+          processValidDocIdMetadata(taskConfigs, completedSegmentsMap, 
validDocIdMetadataList);
 
       if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
         pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentSelectionResult.getSegmentsForDeletion(),
@@ -163,7 +153,7 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
 
   @VisibleForTesting
   public static SegmentSelectionResult processValidDocIdMetadata(Map<String, 
String> taskConfigs,
-      Map<String, SegmentZKMetadata> completedSegmentsMap, 
Set<Map.Entry<String, String>> responseSet) {
+      Map<String, SegmentZKMetadata> completedSegmentsMap, 
List<ValidDocIdMetadataInfo> validDocIdMetadataInfoList) {
     double invalidRecordsThresholdPercent = Double.parseDouble(
         
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
             String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
@@ -172,62 +162,22 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
             String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
     List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>();
     List<String> segmentsForDeletion = new ArrayList<>();
-    for (Map.Entry<String, String> streamResponse : responseSet) {
-      JsonNode allValidDocIdMetadata;
-      try {
-        allValidDocIdMetadata = 
JsonUtils.stringToJsonNode(streamResponse.getValue());
-      } catch (IOException e) {
-        LOGGER.error("Unable to parse validDocIdMetadata response for: {}", 
streamResponse.getKey());
-        continue;
-      }
-      Iterator<JsonNode> iterator = allValidDocIdMetadata.elements();
-      while (iterator.hasNext()) {
-        JsonNode validDocIdMetadata = iterator.next();
-        long totalInvalidDocs = 
validDocIdMetadata.get("totalInvalidDocs").asLong();
-        String segmentName = validDocIdMetadata.get("segmentName").asText();
-        SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
-        long totalDocs = validDocIdMetadata.get("totalDocs").asLong();
-        double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) 
* 100;
-        if (totalInvalidDocs == totalDocs) {
-          segmentsForDeletion.add(segment.getSegmentName());
-        } else if (invalidRecordPercent > invalidRecordsThresholdPercent
-            && totalInvalidDocs > invalidRecordsThresholdCount) {
-          segmentsForCompaction.add(segment);
-        }
+    for (ValidDocIdMetadataInfo validDocIdMetadata : 
validDocIdMetadataInfoList) {
+      long totalInvalidDocs = validDocIdMetadata.getTotalInvalidDocs();
+      String segmentName = validDocIdMetadata.getSegmentName();
+      SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+      long totalDocs = validDocIdMetadata.getTotalDocs();
+      double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 
100;
+      if (totalInvalidDocs == totalDocs) {
+        segmentsForDeletion.add(segment.getSegmentName());
+      } else if (invalidRecordPercent > invalidRecordsThresholdPercent
+          && totalInvalidDocs > invalidRecordsThresholdCount) {
+        segmentsForCompaction.add(segment);
       }
     }
     return new SegmentSelectionResult(segmentsForCompaction, 
segmentsForDeletion);
   }
 
-  @VisibleForTesting
-  public static List<String> getValidDocIdMetadataUrls(Map<String, 
List<String>> serverToSegments,
-      BiMap<String, String> serverToEndpoints, String tableNameWithType, 
Set<String> completedSegments)
-      throws URISyntaxException {
-    Set<String> remainingSegments = new HashSet<>(completedSegments);
-    List<String> urls = new ArrayList<>();
-    for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
-      if (remainingSegments.isEmpty()) {
-        break;
-      }
-      String server = entry.getKey();
-      List<String> segmentNames = entry.getValue();
-      URIBuilder uriBuilder = new 
URIBuilder(serverToEndpoints.get(server)).setPath(
-          String.format("/tables/%s/validDocIdMetadata", tableNameWithType));
-      int completedSegmentCountPerServer = 0;
-      for (String segmentName : segmentNames) {
-        if (remainingSegments.remove(segmentName)) {
-          completedSegmentCountPerServer++;
-          uriBuilder.addParameter("segmentNames", segmentName);
-        }
-      }
-      if (completedSegmentCountPerServer > 0) {
-        // only add to the list if the server has completed segments
-        urls.add(uriBuilder.toString());
-      }
-    }
-    return urls;
-  }
-
   private List<SegmentZKMetadata> getCompletedSegments(String 
tableNameWithType, Map<String, String> taskConfigs) {
     List<SegmentZKMetadata> completedSegments = new ArrayList<>();
     String bufferPeriod = 
taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, 
DEFAULT_BUFFER_PERIOD);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
index 604c58f6d0..0869880ccf 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
@@ -24,6 +24,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
 import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -51,13 +52,15 @@ public class UpsertCompactionTaskExecutorTest {
     
Mockito.when(helixManager.getClusterManagmentTool()).thenReturn(clusterManagementTool);
     minionContext.setHelixManager(helixManager);
 
-    String server = UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, 
REALTIME_TABLE_NAME);
+    String server = MinionTaskUtils.getServer(SEGMENT_NAME, 
REALTIME_TABLE_NAME, helixManager.getClusterManagmentTool(),
+        helixManager.getClusterName());
 
     Assert.assertEquals(server, "server1");
 
     // verify exception thrown with OFFLINE server
     map.put("server1", SegmentStateModel.OFFLINE);
     Assert.assertThrows(IllegalStateException.class,
-        () -> UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, 
REALTIME_TABLE_NAME));
+        () -> MinionTaskUtils.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME,
+            helixManager.getClusterManagmentTool(), 
helixManager.getClusterName()));
   }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 03908a958e..7da9a7f1e8 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -18,20 +18,17 @@
  */
 package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
 
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import java.net.URISyntaxException;
-import java.util.AbstractMap;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
@@ -41,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
@@ -171,57 +169,6 @@ public class UpsertCompactionTaskGeneratorTest {
     assertEquals(pinotTaskConfigs.size(), 0);
   }
 
-  @Test
-  public void testGetValidDocIdMetadataUrls()
-      throws URISyntaxException {
-    Map<String, List<String>> serverToSegments = new HashMap<>();
-    serverToSegments.put("server1",
-        Lists.newArrayList(_completedSegment.getSegmentName(), 
_completedSegment2.getSegmentName()));
-    serverToSegments.put("server2", Lists.newArrayList("consumingSegment"));
-    BiMap<String, String> serverToEndpoints = HashBiMap.create(1);
-    serverToEndpoints.put("server1", "http://endpoint1";);
-    serverToEndpoints.put("server2", "http://endpoint2";);
-    Set<String> completedSegments = new HashSet<>();
-    completedSegments.add(_completedSegment.getSegmentName());
-    completedSegments.add(_completedSegment2.getSegmentName());
-
-    List<String> validDocIdUrls =
-        
UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, 
serverToEndpoints,
-            REALTIME_TABLE_NAME, completedSegments);
-
-    String expectedUrl =
-        
String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s",
 "http://endpoint1";,
-            REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), 
_completedSegment2.getSegmentName());
-    assertEquals(validDocIdUrls.get(0), expectedUrl);
-    assertEquals(validDocIdUrls.size(), 1);
-  }
-
-  @Test
-  public void testGetValidDocIdMetadataUrlsWithReplicatedSegments()
-      throws URISyntaxException {
-    Map<String, List<String>> serverToSegments = new LinkedHashMap<>();
-    serverToSegments.put("server1",
-        Lists.newArrayList(_completedSegment.getSegmentName(), 
_completedSegment2.getSegmentName()));
-    serverToSegments.put("server2",
-        Lists.newArrayList(_completedSegment.getSegmentName(), 
_completedSegment2.getSegmentName()));
-    BiMap<String, String> serverToEndpoints = HashBiMap.create(1);
-    serverToEndpoints.put("server1", "http://endpoint1";);
-    serverToEndpoints.put("server2", "http://endpoint2";);
-    Set<String> completedSegments = new HashSet<>();
-    completedSegments.add(_completedSegment.getSegmentName());
-    completedSegments.add(_completedSegment2.getSegmentName());
-
-    List<String> validDocIdUrls =
-        
UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, 
serverToEndpoints,
-            REALTIME_TABLE_NAME, completedSegments);
-
-    String expectedUrl =
-        
String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s",
 "http://endpoint1";,
-            REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), 
_completedSegment2.getSegmentName());
-    assertEquals(validDocIdUrls.get(0), expectedUrl);
-    assertEquals(validDocIdUrls.size(), 1);
-  }
-
   @Test
   public void testGetMaxTasks() {
     Map<String, String> taskConfigs = new HashMap<>();
@@ -234,16 +181,20 @@ public class UpsertCompactionTaskGeneratorTest {
   }
 
   @Test
-  public void testProcessValidDocIdMetadata() {
+  public void testProcessValidDocIdMetadata()
+      throws IOException {
     Map<String, String> compactionConfigs = getCompactionConfigs("1", "10");
-    Set<Map.Entry<String, String>> responseSet = new HashSet<>();
+    List<ValidDocIdMetadataInfo> validDocIdMetadataInfoList = new 
ArrayList<>();
     String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 
50," + "\"segmentName\" : \""
         + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + 
"}," + "{" + "\"totalValidDocs\" : 0,"
         + "\"totalInvalidDocs\" : 10," + "\"segmentName\" : \"" + 
_completedSegment2.getSegmentName() + "\","
         + "\"totalDocs\" : 10" + "}]";
-    responseSet.add(new AbstractMap.SimpleEntry<>("", json));
+    List<ValidDocIdMetadataInfo> validDocIdMetadataInfo =
+        JsonUtils.stringToObject(json, new 
TypeReference<ArrayList<ValidDocIdMetadataInfo>>() {
+        });
     UpsertCompactionTaskGenerator.SegmentSelectionResult 
segmentSelectionResult =
-        
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, 
_completedSegmentsMap, responseSet);
+        
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, 
_completedSegmentsMap,
+            validDocIdMetadataInfo);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
         _completedSegment.getSegmentName());
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), 
_completedSegment2.getSegmentName());
@@ -251,20 +202,23 @@ public class UpsertCompactionTaskGeneratorTest {
     // test with a higher invalidRecordsThresholdPercent
     compactionConfigs = getCompactionConfigs("60", "10");
     segmentSelectionResult =
-        
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, 
_completedSegmentsMap, responseSet);
+        
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, 
_completedSegmentsMap,
+            validDocIdMetadataInfo);
     assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
 
     // test without an invalidRecordsThresholdPercent
     compactionConfigs = getCompactionConfigs("0", "10");
     segmentSelectionResult =
-        
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, 
_completedSegmentsMap, responseSet);
+        
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, 
_completedSegmentsMap,
+            validDocIdMetadataInfo);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
         _completedSegment.getSegmentName());
 
     // test without a invalidRecordsThresholdCount
     compactionConfigs = getCompactionConfigs("30", "0");
     segmentSelectionResult =
-        
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, 
_completedSegmentsMap, responseSet);
+        
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, 
_completedSegmentsMap,
+            validDocIdMetadataInfo);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
         _completedSegment.getSegmentName());
   }
@@ -284,7 +238,7 @@ public class UpsertCompactionTaskGeneratorTest {
   private IdealState getIdealState(String tableName, List<String> 
segmentNames) {
     IdealState idealState = new IdealState(tableName);
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
-    for (String segmentName: segmentNames) {
+    for (String segmentName : segmentNames) {
       idealState.setPartitionState(segmentName, "Server_0", "ONLINE");
     }
     return idealState;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
new file mode 100644
index 0000000000..2795982ab6
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * Compacted Pinot Segment Record Reader used for upsert compaction
+ */
+public class CompactedPinotSegmentRecordReader implements RecordReader {
+  private final PinotSegmentRecordReader _pinotSegmentRecordReader;
+  private final PeekableIntIterator _validDocIdsIterator;
+  // Reusable generic row to store the next row to return
+  GenericRow _nextRow = new GenericRow();
+  // Flag to mark whether we need to fetch another row
+  boolean _nextRowReturned = true;
+
+  public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap 
validDocIds) {
+    _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+    _pinotSegmentRecordReader.init(indexDir, null, null);
+    _validDocIdsIterator = validDocIds.getIntIterator();
+  }
+
+  @Override
+  public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
+      throws IOException {
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!_validDocIdsIterator.hasNext() && _nextRowReturned) {
+      return false;
+    }
+
+    // If next row has not been returned, return true
+    if (!_nextRowReturned) {
+      return true;
+    }
+
+    // Try to get the next row to return
+    if (_validDocIdsIterator.hasNext()) {
+      int docId = _validDocIdsIterator.next();
+      _nextRow.clear();
+      _pinotSegmentRecordReader.getRecord(docId, _nextRow);
+      _nextRowReturned = false;
+      return true;
+    }
+
+    // Cannot find next row to return, return false
+    return false;
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    Preconditions.checkState(!_nextRowReturned);
+    reuse.init(_nextRow);
+    _nextRowReturned = true;
+    return reuse;
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _pinotSegmentRecordReader.rewind();
+    _nextRowReturned = true;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _pinotSegmentRecordReader.close();
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index b08833b966..29f90715da 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -529,31 +529,64 @@ public class TablesResource {
   public String getValidDocIdMetadata(
       @ApiParam(value = "Table name including type", required = true, example 
= "myTable_REALTIME")
       @PathParam("tableNameWithType") String tableNameWithType,
-      @ApiParam(value = "Segment name", allowMultiple = true, required = true) 
@QueryParam("segmentNames")
+      @ApiParam(value = "Segment name", allowMultiple = true) 
@QueryParam("segmentNames")
       List<String> segmentNames) {
+    return 
ResourceUtils.convertToJsonString(processValidDocIdMetadata(tableNameWithType, 
segmentNames));
+  }
+
+  @POST
+  @Path("/tables/{tableNameWithType}/validDocIdMetadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Provides segment validDocId metadata", notes = 
"Provides segment validDocId metadata")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal server error", response = 
ErrorInfo.class),
+      @ApiResponse(code = 404, message = "Table or segment not found", 
response = ErrorInfo.class)
+  })
+  public String getValidDocIdMetadata(
+      @ApiParam(value = "Table name including type", required = true, example 
= "myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType, TableSegments 
tableSegments) {
+    List<String> segmentNames = tableSegments.getSegments();
+    return 
ResourceUtils.convertToJsonString(processValidDocIdMetadata(tableNameWithType, 
segmentNames));
+  }
+
+  private List<Map<String, Object>> processValidDocIdMetadata(String 
tableNameWithType, List<String> segments) {
     TableDataManager tableDataManager =
         ServerResourceUtils.checkGetTableDataManager(_serverInstance, 
tableNameWithType);
     List<String> missingSegments = new ArrayList<>();
-    List<SegmentDataManager> segmentDataManagers = 
tableDataManager.acquireSegments(segmentNames, missingSegments);
-    if (!missingSegments.isEmpty()) {
-      throw new WebApplicationException(String.format("Table %s has missing 
segments", tableNameWithType),
-          Response.Status.NOT_FOUND);
+    List<SegmentDataManager> segmentDataManagers;
+    if (segments == null) {
+      segmentDataManagers = tableDataManager.acquireAllSegments();
+    } else {
+      segmentDataManagers = tableDataManager.acquireSegments(segments, 
missingSegments);
+      if (!missingSegments.isEmpty()) {
+        throw new WebApplicationException(
+            String.format("Table %s has missing segments: %s)", 
tableNameWithType, segments),
+            Response.Status.NOT_FOUND);
+      }
     }
     List<Map<String, Object>> allValidDocIdMetadata = new ArrayList<>();
     for (SegmentDataManager segmentDataManager : segmentDataManagers) {
       try {
         IndexSegment indexSegment = segmentDataManager.getSegment();
+        if (indexSegment == null) {
+          LOGGER.warn("Table {} segment {} does not exist", tableNameWithType, 
segmentDataManager.getSegmentName());
+          continue;
+        }
+        // Skip the consuming segments
         if (!(indexSegment instanceof ImmutableSegmentImpl)) {
-          throw new WebApplicationException(
-              String.format("Table %s segment %s is not a immutable segment", 
tableNameWithType,
-                  segmentDataManager.getSegmentName()), 
Response.Status.BAD_REQUEST);
+          String msg = String.format("Table %s segment %s is not a immutable 
segment", tableNameWithType,
+              segmentDataManager.getSegmentName());
+          LOGGER.warn(msg);
+          continue;
         }
         MutableRoaringBitmap validDocIds =
             indexSegment.getValidDocIds() != null ? 
indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
         if (validDocIds == null) {
-          throw new WebApplicationException(
-              String.format("Missing validDocIds for table %s segment %s does 
not exist", tableNameWithType,
-                  segmentDataManager.getSegmentName()), 
Response.Status.NOT_FOUND);
+          String msg = String.format("Missing validDocIds for table %s segment 
%s does not exist", tableNameWithType,
+              segmentDataManager.getSegmentName());
+          LOGGER.warn(msg);
+          throw new WebApplicationException(msg, Response.Status.NOT_FOUND);
         }
         Map<String, Object> validDocIdMetadata = new HashMap<>();
         int totalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
@@ -568,7 +601,7 @@ public class TablesResource {
         tableDataManager.releaseSegment(segmentDataManager);
       }
     }
-    return ResourceUtils.convertToJsonString(allValidDocIdMetadata);
+    return allValidDocIdMetadata;
   }
 
   /**
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 0469d805d2..79b17396de 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.Response;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
@@ -311,6 +312,27 @@ public class TablesResourceTest extends BaseResourceTest {
     Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(), 
99992);
   }
 
+  @Test
+  public void testValidDocIdMetadataPost()
+      throws IOException {
+    IndexSegment segment = _realtimeIndexSegments.get(0);
+    // Verify the content of the downloaded snapshot from a realtime table.
+    
downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+        (ImmutableSegmentImpl) segment);
+    List<String> segments = List.of(segment.getSegmentName());
+    TableSegments tableSegments = new TableSegments(segments);
+    String validDocIdMetadataPath =
+        "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + 
"/validDocIdMetadata";
+    String response =
+        _webTarget.path(validDocIdMetadataPath).queryParam("segmentNames", 
segment.getSegmentName()).request()
+            .post(Entity.json(tableSegments), String.class);
+    JsonNode validDocIdMetadata = JsonUtils.stringToJsonNode(response).get(0);
+
+    Assert.assertEquals(validDocIdMetadata.get("totalDocs").asInt(), 100000);
+    Assert.assertEquals(validDocIdMetadata.get("totalValidDocs").asInt(), 8);
+    Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(), 
99992);
+  }
+
   // Verify metadata file from segments.
   private void downLoadAndVerifySegmentContent(String tableNameWithType, 
IndexSegment segment)
       throws IOException, ConfigurationException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to