This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fad53a344 Make upsert compaction task more robust to crc mismatch 
(#13489)
1fad53a344 is described below

commit 1fad53a344398593cdff4271a2d472cc16cf24d4
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Fri Jul 12 20:21:42 2024 +0530

    Make upsert compaction task more robust to crc mismatch (#13489)
    
    * Allow upsert compaction to work properly during schema / indexing updates
    
    * iterate through all servers
    
    * improve upsert compaction task generator logic
    
    * address comments - refactor
    
    * address comments - more refactoring
    
    * address comments -- undo some refactoring
    
    * minor address
---
 .../util/ServerSegmentMetadataReader.java          | 28 ++++++--
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 81 +++++++++++++++-------
 .../UpsertCompactionTaskExecutor.java              | 41 +++++------
 .../UpsertCompactionTaskGenerator.java             | 51 +++++++-------
 .../UpsertCompactionTaskGeneratorTest.java         | 52 ++++++++------
 5 files changed, 153 insertions(+), 100 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 9c2ffa6196..b3fd851ff4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.MediaType;
@@ -215,14 +216,30 @@ public class ServerSegmentMetadataReader {
 
   /**
    * This method is called when the API request is to fetch validDocId 
metadata for a list segments of the given table.
-   * This method will pick a server that hosts the target segment and fetch 
the segment metadata result.
+   * This method will pick one server randomly that hosts the target segment 
and fetch the segment metadata result.
    *
-   * @return segment metadata as a JSON string
+   * @return list of valid doc id metadata, one per segment processed.
    */
   public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String 
tableNameWithType,
       Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> 
serverToEndpoints,
       @Nullable List<String> segmentNames, int timeoutMs, String 
validDocIdsType,
       int numSegmentsBatchPerServerRequest) {
+    return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, 
serverToSegmentsMap, serverToEndpoints,
+        segmentNames, timeoutMs, validDocIdsType, 
numSegmentsBatchPerServerRequest).values().stream()
+        .filter(list -> list != null && !list.isEmpty()).map(list -> 
list.get(0)).collect(Collectors.toList());
+  }
+
+  /**
+   * This method is called when the API request is to fetch validDocId 
metadata for a list segments of the given table.
+   * This method will pick all servers that hosts the target segment and fetch 
the segment metadata result and
+   * return as a list.
+   *
+   * @return map of segment name to list of valid doc id metadata where each 
element is every server's metadata.
+   */
+  public Map<String, List<ValidDocIdsMetadataInfo>> 
getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
+      Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> 
serverToEndpoints,
+      @Nullable List<String> segmentNames, int timeoutMs, String 
validDocIdsType,
+      int numSegmentsBatchPerServerRequest) {
     List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
     for (Map.Entry<String, List<String>> serverToSegments : 
serverToSegmentsMap.entrySet()) {
       List<String> segmentsForServer = serverToSegments.getValue();
@@ -256,7 +273,7 @@ public class ServerSegmentMetadataReader {
         completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, 
tableNameWithType, true, requestHeaders,
             timeoutMs, null);
 
-    Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new 
HashMap<>();
+    Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfos = new 
HashMap<>();
     int failedParses = 0;
     int returnedServerRequestsCount = 0;
     for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
@@ -266,7 +283,8 @@ public class ServerSegmentMetadataReader {
             JsonUtils.stringToObject(validDocIdsMetadataList, new 
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
             });
         for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo : 
validDocIdsMetadataInfoList) {
-          
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), 
validDocIdsMetadataInfo);
+          
validDocIdsMetadataInfos.computeIfAbsent(validDocIdsMetadataInfo.getSegmentName(),
 k -> new ArrayList<>())
+              .add(validDocIdsMetadataInfo);
         }
         returnedServerRequestsCount++;
       } catch (Exception e) {
@@ -292,7 +310,7 @@ public class ServerSegmentMetadataReader {
 
     LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server 
requests.",
         validDocIdsMetadataInfos.size(), returnedServerRequestsCount);
-    return new ArrayList<>(validDocIdsMetadataInfos.values());
+    return validDocIdsMetadataInfos;
   }
 
   /**
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 9b7dad1955..55dfb97f98 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -23,10 +23,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
@@ -42,6 +44,7 @@ import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,32 +146,6 @@ public class MinionTaskUtils {
     return dirInStr;
   }
 
-  public static ValidDocIdsBitmapResponse getValidDocIdsBitmap(String 
tableNameWithType, String segmentName,
-      String validDocIdsType, MinionContext minionContext) {
-    HelixAdmin helixAdmin = 
minionContext.getHelixManager().getClusterManagmentTool();
-    String clusterName = minionContext.getHelixManager().getClusterName();
-
-    List<String> servers = getServers(segmentName, tableNameWithType, 
helixAdmin, clusterName);
-    for (String server : servers) {
-      InstanceConfig instanceConfig = 
helixAdmin.getInstanceConfig(clusterName, server);
-      String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
-
-      // We only need aggregated table size and the total number of docs/rows. 
Skipping column related stats, by
-      // passing an empty list.
-      ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
-      try {
-        return 
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, 
segmentName, endpoint,
-            validDocIdsType, 60_000);
-      } catch (Exception e) {
-        LOGGER.warn(
-            String.format("Unable to retrieve validDocIds bitmap for segment: 
%s from endpoint: %s", segmentName,
-                endpoint), e);
-      }
-    }
-    throw new IllegalStateException(
-        String.format("Unable to retrieve validDocIds bitmap for segment: %s 
from servers: %s", segmentName, servers));
-  }
-
   public static List<String> getServers(String segmentName, String 
tableNameWithType, HelixAdmin helixAdmin,
       String clusterName) {
     ExternalView externalView = 
helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
@@ -206,4 +183,56 @@ public class MinionTaskUtils {
     }
     return defaultValue;
   }
+
+  /**
+   * Returns the validDocID bitmap from the server whose local segment crc 
matches both crc of ZK metadata and
+   * deepstore copy (expectedCrc).
+   */
+  @Nullable
+  public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String 
tableNameWithType, String segmentName,
+      String validDocIdsType, MinionContext minionContext, String expectedCrc) 
{
+    String clusterName = minionContext.getHelixManager().getClusterName();
+    HelixAdmin helixAdmin = 
minionContext.getHelixManager().getClusterManagmentTool();
+    RoaringBitmap validDocIds = null;
+    List<String> servers = getServers(segmentName, tableNameWithType, 
helixAdmin, clusterName);
+    for (String server : servers) {
+      InstanceConfig instanceConfig = 
helixAdmin.getInstanceConfig(clusterName, server);
+      String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
+
+      // We only need aggregated table size and the total number of docs/rows. 
Skipping column related stats, by
+      // passing an empty list.
+      ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
+      ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
+      try {
+        validDocIdsBitmapResponse =
+            
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, 
segmentName, endpoint,
+                validDocIdsType, 60_000);
+      } catch (Exception e) {
+        LOGGER.warn(
+            String.format("Unable to retrieve validDocIds bitmap for segment: 
%s from endpoint: %s", segmentName,
+                endpoint), e);
+        continue;
+      }
+
+      // Check crc from the downloaded segment against the crc returned from 
the server along with the valid doc id
+      // bitmap. If this doesn't match, this means that we are hitting the 
race condition where the segment has been
+      // uploaded successfully while the server is still reloading the 
segment. Reloading can take a while when the
+      // offheap upsert is used because we will need to delete & add all 
primary keys.
+      // `BaseSingleSegmentConversionExecutor.executeTask()` already checks 
for the crc from the task generator
+      // against the crc from the current segment zk metadata, so we don't 
need to check that here.
+      String crcFromValidDocIdsBitmap = 
validDocIdsBitmapResponse.getSegmentCrc();
+      if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
+        // In this scenario, we are hitting the other replica of the segment 
which did not commit to ZK or deepstore.
+        // We will skip processing this bitmap to query other server to 
confirm if there is a valid matching CRC.
+        String message = String.format("CRC mismatch for segment: %s, expected 
value based on task generator: %s, "
+                + "actual crc from validDocIdsBitmapResponse from endpoint %s: 
%s", segmentName, expectedCrc, endpoint,
+            crcFromValidDocIdsBitmap);
+        LOGGER.warn(message);
+        continue;
+      }
+      validDocIds = 
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
+      break;
+    }
+    return validDocIds;
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index ec5cc127d9..e683214f43 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -23,9 +23,7 @@ import java.util.Collections;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
-import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
-import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import 
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
@@ -60,31 +58,28 @@ public class UpsertCompactionTaskExecutor extends 
BaseSingleSegmentConversionExe
 
     String validDocIdsTypeStr =
         
configs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_TYPE, 
ValidDocIdsType.SNAPSHOT.name());
-    ValidDocIdsType validDocIdsType = 
ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
-    ValidDocIdsBitmapResponse validDocIdsBitmapResponse =
-        MinionTaskUtils.getValidDocIdsBitmap(tableNameWithType, segmentName, 
validDocIdsType.toString(),
-            MINION_CONTEXT);
-
-    // Check crc from the downloaded segment against the crc returned from the 
server along with the valid doc id
-    // bitmap. If this doesn't match, this means that we are hitting the race 
condition where the segment has been
-    // uploaded successfully while the server is still reloading the segment. 
Reloading can take a while when the
-    // offheap upsert is used because we will need to delete & add all primary 
keys.
-    // `BaseSingleSegmentConversionExecutor.executeTask()` already checks for 
the crc from the task generator
-    // against the crc from the current segment zk metadata, so we don't need 
to check that here.
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
     String originalSegmentCrcFromTaskGenerator = 
configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
     String crcFromDeepStorageSegment = segmentMetadata.getCrc();
-    String crcFromValidDocIdsBitmap = 
validDocIdsBitmapResponse.getSegmentCrc();
-    if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)
-        || 
!originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
-      LOGGER.warn("CRC mismatch for segment: {}, expected: {}, actual crc from 
server: {}", segmentName,
-          crcFromDeepStorageSegment, 
validDocIdsBitmapResponse.getSegmentCrc());
-      return new 
SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
-          .build();
+    if 
(!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
+      String message = String.format("Crc mismatched between ZK and deepstore 
copy of segment: %s. Expected crc "
+              + "from ZK: %s, crc from deepstore: %s", segmentName, 
originalSegmentCrcFromTaskGenerator,
+          crcFromDeepStorageSegment);
+      LOGGER.error(message);
+      throw new IllegalStateException(message);
+    }
+    RoaringBitmap validDocIds =
+        MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, 
segmentName, validDocIdsTypeStr,
+            MINION_CONTEXT, originalSegmentCrcFromTaskGenerator);
+    if (validDocIds == null) {
+      // no valid crc match found or no validDocIds obtained from all servers
+      // error out the task instead of silently failing so that we can track 
it via task-error metrics
+      String message = String.format("No validDocIds found from all servers. 
They either failed to download "
+              + "or did not match crc from segment copy obtained from 
deepstore / servers. " + "Expected crc: %s",
+          originalSegmentCrcFromTaskGenerator);
+      LOGGER.error(message);
+      throw new IllegalStateException(message);
     }
-
-    RoaringBitmap validDocIds = 
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
-
     if (validDocIds.isEmpty()) {
       // prevents empty segment generation
       LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: 
{}", tableNameWithType, segmentName);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 0357bca6fe..64cbe03fe8 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -165,8 +165,8 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
                 validDocIdsType));
       }
 
-      List<ValidDocIdsMetadataInfo> validDocIdsMetadataList =
-          
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, 
serverToSegments,
+      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+          
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
 serverToSegments,
               serverToEndpoints, null, 60_000, validDocIdsType.toString(), 
numSegmentsBatchPerServerRequest);
 
       Map<String, SegmentZKMetadata> completedSegmentsMap =
@@ -209,7 +209,8 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
 
   @VisibleForTesting
   public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, 
String> taskConfigs,
-      Map<String, SegmentZKMetadata> completedSegmentsMap, 
List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList) {
+      Map<String, SegmentZKMetadata> completedSegmentsMap,
+      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap) {
     double invalidRecordsThresholdPercent = Double.parseDouble(
         
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
             String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
@@ -218,30 +219,31 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
             String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
     List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new 
ArrayList<>();
     List<String> segmentsForDeletion = new ArrayList<>();
-    for (ValidDocIdsMetadataInfo validDocIdsMetadata : 
validDocIdsMetadataInfoList) {
-      long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
-      String segmentName = validDocIdsMetadata.getSegmentName();
-
-      // Skip segments if the crc from zk metadata and server does not match. 
They may be being reloaded.
-      SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
-      if (segment == null) {
+    for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
+      // check if segment is part of completed segments
+      if (!completedSegmentsMap.containsKey(segmentName)) {
         LOGGER.warn("Segment {} is not found in the completed segments list, 
skipping it for compaction", segmentName);
         continue;
       }
+      SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+      for (ValidDocIdsMetadataInfo validDocIdsMetadata : 
validDocIdsMetadataInfoMap.get(segmentName)) {
+        long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
 
-      if (segment.getCrc() != 
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
-        LOGGER.warn(
-            "CRC mismatch for segment: {}, skipping it for compaction 
(segmentZKMetadata={}, validDocIdsMetadata={})",
-            segmentName, segment.getCrc(), 
validDocIdsMetadata.getSegmentCrc());
-        continue;
-      }
-      long totalDocs = validDocIdsMetadata.getTotalDocs();
-      double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 
100;
-      if (totalInvalidDocs == totalDocs) {
-        segmentsForDeletion.add(segment.getSegmentName());
-      } else if (invalidRecordPercent >= invalidRecordsThresholdPercent
-          && totalInvalidDocs >= invalidRecordsThresholdCount) {
-        segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
+        // Skip segments if the crc from zk metadata and server does not 
match. They may be being reloaded.
+        if (segment.getCrc() != 
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
+          LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, 
validDocIdsMetadata={})", segmentName,
+              segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
+          continue;
+        }
+        long totalDocs = validDocIdsMetadata.getTotalDocs();
+        double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) 
* 100;
+        if (totalInvalidDocs == totalDocs) {
+          segmentsForDeletion.add(segment.getSegmentName());
+        } else if (invalidRecordPercent >= invalidRecordsThresholdPercent
+            && totalInvalidDocs >= invalidRecordsThresholdCount) {
+          segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
+        }
+        break;
       }
     }
     segmentsForCompaction.sort((o1, o2) -> {
@@ -254,8 +256,7 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
     });
 
     return new SegmentSelectionResult(
-        
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
-        segmentsForDeletion);
+        
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
 segmentsForDeletion);
   }
 
   @VisibleForTesting
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 971a288f82..ec8d8ea786 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -82,6 +82,7 @@ public class UpsertCompactionTaskGeneratorTest {
     _completedSegment.setEndTime(System.currentTimeMillis() - 
TimeUtils.convertPeriodToMillis("1d"));
     _completedSegment.setTimeUnit(TimeUnit.MILLISECONDS);
     _completedSegment.setTotalDocs(100L);
+    _completedSegment.setCrc(1000);
 
     _completedSegment2 = new SegmentZKMetadata("testTable__1");
     _completedSegment2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
@@ -89,6 +90,7 @@ public class UpsertCompactionTaskGeneratorTest {
     _completedSegment2.setEndTime(System.currentTimeMillis());
     _completedSegment2.setTimeUnit(TimeUnit.MILLISECONDS);
     _completedSegment2.setTotalDocs(10L);
+    _completedSegment2.setCrc(2000);
 
     _completedSegmentsMap = new HashMap<>();
     _completedSegmentsMap.put(_completedSegment.getSegmentName(), 
_completedSegment);
@@ -231,24 +233,27 @@ public class UpsertCompactionTaskGeneratorTest {
   public void testProcessValidDocIdsMetadata()
       throws IOException {
     Map<String, String> compactionConfigs = getCompactionConfigs("1", "10");
-    String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 
50," + "\"segmentName\" : \""
-        + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + 
", \"segmentCrc\": \""
-        + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 
0," + "\"totalInvalidDocs\" : 10,"
-        + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", 
" + "\"segmentCrc\" : \""
-        + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 10" + "}]";
-
-    List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfo =
-        JsonUtils.stringToObject(json, new 
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
+    String json = "{\"testTable__0\": [{\"totalValidDocs\": 50, 
\"totalInvalidDocs\": 50, "
+        + "\"segmentName\": \"testTable__0\", \"totalDocs\": 100, 
\"segmentCrc\": \"1000\"}], "
+        + "\"testTable__1\": [{\"totalValidDocs\": 0, "
+        + "\"totalInvalidDocs\": 10, \"segmentName\": \"testTable__1\", 
\"totalDocs\": 10, \"segmentCrc\": \"2000\"}]}";
+
+    Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfo =
+        JsonUtils.stringToObject(json, new TypeReference<>() {
         });
 
+    // no completed segments scenario, there shouldn't be any segment selected 
for compaction
     UpsertCompactionTaskGenerator.SegmentSelectionResult 
segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, new 
HashMap<>(),
             validDocIdsMetadataInfo);
     assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0);
 
+    // test with valid crc and thresholds
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
             validDocIdsMetadataInfo);
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
+    assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
         _completedSegment.getSegmentName());
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), 
_completedSegment2.getSegmentName());
@@ -259,6 +264,7 @@ public class UpsertCompactionTaskGeneratorTest {
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
             validDocIdsMetadataInfo);
     assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
+    assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), 
_completedSegment2.getSegmentName());
 
     // test without an invalidRecordsThresholdPercent
@@ -266,6 +272,8 @@ public class UpsertCompactionTaskGeneratorTest {
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
             validDocIdsMetadataInfo);
+    assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
         _completedSegment.getSegmentName());
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), 
_completedSegment2.getSegmentName());
@@ -275,18 +283,19 @@ public class UpsertCompactionTaskGeneratorTest {
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
             validDocIdsMetadataInfo);
+    assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
         _completedSegment.getSegmentName());
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), 
_completedSegment2.getSegmentName());
 
     // Test the case where the completedSegment from api has different crc 
than segment from zk metadata.
-    json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + 
"\"segmentName\" : \""
-        + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + 
", \"segmentCrc\": \""
-        + "1234567890" + "\"}," + "{" + "\"totalValidDocs\" : 0," + 
"\"totalInvalidDocs\" : 10,"
-        + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", 
" + "\"segmentCrc\" : \""
-        + _completedSegment2.getCrc() + "\","
-        + "\"totalDocs\" : 10" + "}]";
-    validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new 
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
+    json = "{\"" + _completedSegment.getSegmentName() + "\": 
[{\"totalValidDocs\": 50, \"totalInvalidDocs\": 50, "
+        + "\"segmentName\": \"" + _completedSegment.getSegmentName() + "\", 
\"totalDocs\": 100, \"segmentCrc\": "
+        + "\"1234567890\"}], \"" + _completedSegment2.getSegmentName() + "\": 
[{\"totalValidDocs\": 0, "
+        + "\"totalInvalidDocs\": 10, \"segmentName\": \"" + 
_completedSegment2.getSegmentName() + "\", "
+        + "\"segmentCrc\": \"" + _completedSegment2.getCrc() + "\", 
\"totalDocs\": 10}]}";
+    validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new 
TypeReference<>() {
     });
     segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
@@ -301,12 +310,13 @@ public class UpsertCompactionTaskGeneratorTest {
         _completedSegment2.getSegmentName());
 
     // check if both the candidates for compaction are coming in sorted 
descending order
-    json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + 
"\"segmentName\" : \""
-        + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + 
", \"segmentCrc\": \""
-        + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 
10," + "\"totalInvalidDocs\" : 40,"
-        + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", 
" + "\"segmentCrc\" : \""
-        + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 50" + "}]";
-    validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new 
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
+    json = "{\"" + _completedSegment.getSegmentName() + "\": 
[{\"totalValidDocs\": 50, \"totalInvalidDocs\": 50, "
+        + "\"segmentName\": \"" + _completedSegment.getSegmentName() + "\", 
\"totalDocs\": 100, \"segmentCrc\": \""
+        + _completedSegment.getCrc() + "\"}], \"" + 
_completedSegment2.getSegmentName() + "\": "
+        + "[{\"totalValidDocs\": 10, \"totalInvalidDocs\": 40, 
\"segmentName\": \""
+        + _completedSegment2.getSegmentName() + "\", \"segmentCrc\": \"" + 
_completedSegment2.getCrc() + "\", "
+        + "\"totalDocs\": 50}]}";
+    validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new 
TypeReference<>() {
     });
     compactionConfigs = getCompactionConfigs("30", "0");
     segmentSelectionResult =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to