This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f6ae06f499 Fixing the bug for Upsert compaction task generator (#12380)
f6ae06f499 is described below

commit f6ae06f499e2fc18934b1ee605d50b7d9006e46b
Author: Seunghyun Lee <seungh...@startree.ai>
AuthorDate: Wed Feb 7 09:09:05 2024 -0800

    Fixing the bug for Upsert compaction task generator (#12380)
    
    - The current task generator had an issue with the null pointer
      exception when validDocIds metadata doesn't show up on the
      list of ZK metadata. Fixed the logic to properly handle
      this case.
    - Added the default value for validDocIdsType for API
    - Added unit tests
---
 .../api/resources/PinotTableRestletResource.java   |  3 +-
 .../UpsertCompactionTaskGenerator.java             | 27 ++++++++---
 .../UpsertCompactionTaskGeneratorTest.java         | 53 ++++++++++++++++++++++
 3 files changed, 76 insertions(+), 7 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index aedc6a7ef8..c0f11a412b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -963,7 +963,7 @@ public class PinotTableRestletResource {
       @ApiParam(value = "A list of segments", allowMultiple = true) 
@QueryParam("segmentNames")
       List<String> segmentNames,
       @ApiParam(value = "Valid doc ids type")
-      @QueryParam("validDocIdsType") ValidDocIdsType validDocIdsType) {
+      @QueryParam("validDocIdsType") @DefaultValue("SNAPSHOT") ValidDocIdsType 
validDocIdsType) {
     LOGGER.info("Received a request to fetch aggregate valid doc id metadata 
for a table {}", tableName);
     TableType tableType = Constants.validateTableType(tableTypeStr);
     if (tableType == TableType.OFFLINE) {
@@ -977,6 +977,7 @@ public class PinotTableRestletResource {
     try {
       TableMetadataReader tableMetadataReader =
           new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+      validDocIdsType = (validDocIdsType == null) ? ValidDocIdsType.SNAPSHOT : 
validDocIdsType;
       JsonNode segmentsMetadataJson =
           
tableMetadataReader.getAggregateValidDocIdsMetadata(tableNameWithType, 
segmentNames,
               validDocIdsType.toString(), 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 6d21d8417f..de24b8a5a6 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -60,9 +60,9 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
 
   public static class SegmentSelectionResult {
 
-    private List<SegmentZKMetadata> _segmentsForCompaction;
+    private final List<SegmentZKMetadata> _segmentsForCompaction;
 
-    private List<String> _segmentsForDeletion;
+    private final List<String> _segmentsForDeletion;
 
     SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, 
List<String> segmentsForDeletion) {
       _segmentsForCompaction = segmentsForCompaction;
@@ -96,8 +96,17 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
       String tableNameWithType = tableConfig.getTableName();
       LOGGER.info("Start generating task configs for table: {}", 
tableNameWithType);
 
+      if (tableConfig.getTaskConfig() == null) {
+        LOGGER.warn("Task config is null for table: {}", tableNameWithType);
+        continue;
+      }
+
       Map<String, String> taskConfigs = 
tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
-      List<SegmentZKMetadata> completedSegments = 
getCompletedSegments(tableNameWithType, taskConfigs);
+      List<SegmentZKMetadata> allSegments = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+
+      // Get completed segments and filter out the segments based on the 
buffer time configuration
+      List<SegmentZKMetadata> completedSegments =
+          getCompletedSegments(taskConfigs, allSegments, 
System.currentTimeMillis());
 
       if (completedSegments.isEmpty()) {
         LOGGER.info("No completed segments were eligible for compaction for 
table: {}", tableNameWithType);
@@ -211,6 +220,11 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
 
       // Skip segments if the crc from zk metadata and server does not match. 
They may be being reloaded.
       SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+      if (segment == null) {
+        LOGGER.warn("Segment {} is not found in the completed segments list, 
skipping it for compaction", segmentName);
+        continue;
+      }
+
       if (segment.getCrc() != 
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
         LOGGER.warn(
             "CRC mismatch for segment: {}, skipping it for compaction 
(segmentZKMetadata={}, validDocIdsMetadata={})",
@@ -229,15 +243,16 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
     return new SegmentSelectionResult(segmentsForCompaction, 
segmentsForDeletion);
   }
 
-  private List<SegmentZKMetadata> getCompletedSegments(String 
tableNameWithType, Map<String, String> taskConfigs) {
+  @VisibleForTesting
+  public static List<SegmentZKMetadata> getCompletedSegments(Map<String, 
String> taskConfigs,
+      List<SegmentZKMetadata> allSegments, long currentTimeInMillis) {
     List<SegmentZKMetadata> completedSegments = new ArrayList<>();
     String bufferPeriod = 
taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, 
DEFAULT_BUFFER_PERIOD);
     long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
-    List<SegmentZKMetadata> allSegments = 
getSegmentsZKMetadataForTable(tableNameWithType);
     for (SegmentZKMetadata segment : allSegments) {
       CommonConstants.Segment.Realtime.Status status = segment.getStatus();
       // initial segments selection based on status and age
-      if (status.isCompleted() && (segment.getEndTimeMs() <= 
(System.currentTimeMillis() - bufferMs))) {
+      if (status.isCompleted() && (segment.getEndTimeMs() <= 
(currentTimeInMillis - bufferMs))) {
         completedSegments.add(segment);
       }
     }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index d74b03e815..5aacba1f93 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -181,6 +181,52 @@ public class UpsertCompactionTaskGeneratorTest {
     assertEquals(maxTasks, 10);
   }
 
+  @Test
+  public void testGetCompletedSegments() {
+    long currentTimeInMillis = System.currentTimeMillis();
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "1d");
+
+    SegmentZKMetadata metadata1 = new SegmentZKMetadata("testTable");
+    metadata1.setEndTime(1694198844776L);
+    metadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    metadata1.setTimeUnit(TimeUnit.MILLISECONDS);
+    SegmentZKMetadata metadata2 = new SegmentZKMetadata("testTable");
+    metadata2.setEndTime(1699639830678L);
+    metadata2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    metadata2.setTimeUnit(TimeUnit.MILLISECONDS);
+
+    SegmentZKMetadata metadata3 = new SegmentZKMetadata("testTable");
+    metadata3.setEndTime(currentTimeInMillis);
+    metadata3.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    metadata3.setTimeUnit(TimeUnit.MILLISECONDS);
+
+    List<SegmentZKMetadata> segmentZKMetadataList = new ArrayList<>();
+    segmentZKMetadataList.add(metadata1);
+    segmentZKMetadataList.add(metadata2);
+    segmentZKMetadataList.add(metadata3);
+
+    List<SegmentZKMetadata> result =
+        UpsertCompactionTaskGenerator.getCompletedSegments(taskConfigs, 
segmentZKMetadataList, currentTimeInMillis);
+    Assert.assertEquals(result.size(), 2);
+
+    SegmentZKMetadata metadata4 = new SegmentZKMetadata("testTable");
+    metadata4.setEndTime(currentTimeInMillis - 
TimeUtils.convertPeriodToMillis("2d") + 1);
+    metadata4.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    metadata4.setTimeUnit(TimeUnit.MILLISECONDS);
+    segmentZKMetadataList.add(metadata4);
+
+    result =
+        UpsertCompactionTaskGenerator.getCompletedSegments(taskConfigs, 
segmentZKMetadataList, currentTimeInMillis);
+    Assert.assertEquals(result.size(), 3);
+
+    // Check the boundary condition for buffer time period based filtering
+    taskConfigs.put(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "2d");
+    result =
+        UpsertCompactionTaskGenerator.getCompletedSegments(taskConfigs, 
segmentZKMetadataList, currentTimeInMillis);
+    Assert.assertEquals(result.size(), 2);
+  }
+
   @Test
   public void testProcessValidDocIdsMetadata()
       throws IOException {
@@ -190,10 +236,17 @@ public class UpsertCompactionTaskGeneratorTest {
         + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 
0," + "\"totalInvalidDocs\" : 10,"
         + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", 
" + "\"segmentCrc\" : \""
         + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 10" + "}]";
+
     List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfo =
         JsonUtils.stringToObject(json, new 
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
         });
+
     UpsertCompactionTaskGenerator.SegmentSelectionResult 
segmentSelectionResult =
+        
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, new 
HashMap<>(),
+            validDocIdsMetadataInfo);
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0);
+
+    segmentSelectionResult =
         
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, 
_completedSegmentsMap,
             validDocIdsMetadataInfo);
     
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to