This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e161b786f5 Schedule segments with higher invalidDocs first in Upsert Compaction task (#12461) e161b786f5 is described below commit e161b786f5e4426fcfee1d788d7def46b1d8c24b Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Thu Feb 22 03:23:48 2024 +0530 Schedule segments with higher invalidDocs first in Upsert Compaction task (#12461) --- .../UpsertCompactionTaskGenerator.java | 18 +++++++++++++++--- .../UpsertCompactionTaskGeneratorTest.java | 22 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index de24b8a5a6..76f5c7d939 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.task.TaskState; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -212,7 +213,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { long invalidRecordsThresholdCount = Long.parseLong( taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT, String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT))); - List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>(); + List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new ArrayList<>(); List<String> segmentsForDeletion = new ArrayList<>(); for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoList) { long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs(); @@ -237,10 +238,21 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { segmentsForDeletion.add(segment.getSegmentName()); } else if (invalidRecordPercent > invalidRecordsThresholdPercent && totalInvalidDocs > invalidRecordsThresholdCount) { - segmentsForCompaction.add(segment); + segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs)); } } - return new SegmentSelectionResult(segmentsForCompaction, segmentsForDeletion); + segmentsForCompaction.sort((o1, o2) -> { + if (o1.getValue() > o2.getValue()) { + return -1; + } else if (o1.getValue().equals(o2.getValue())) { + return 0; + } + return 1; + }); + + return new SegmentSelectionResult( + segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()), + segmentsForDeletion); } @VisibleForTesting diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java index 5aacba1f93..971a288f82 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java @@ -259,6 +259,7 @@ public class UpsertCompactionTaskGeneratorTest { UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, validDocIdsMetadataInfo); assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty()); + assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); // test without an invalidRecordsThresholdPercent compactionConfigs = getCompactionConfigs("0", "10"); @@ -267,6 +268,7 @@ public class UpsertCompactionTaskGeneratorTest { validDocIdsMetadataInfo); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), _completedSegment.getSegmentName()); + assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); // test without a invalidRecordsThresholdCount compactionConfigs = getCompactionConfigs("30", "0"); @@ -275,6 +277,7 @@ public class UpsertCompactionTaskGeneratorTest { validDocIdsMetadataInfo); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), _completedSegment.getSegmentName()); + assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); // Test the case where the completedSegment from api has different crc than segment from zk metadata. json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \"" @@ -296,6 +299,25 @@ public class UpsertCompactionTaskGeneratorTest { Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); + + // check if both the candidates for compaction are coming in sorted descending order + json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \"" + + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + ", \"segmentCrc\": \"" + + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 10," + "\"totalInvalidDocs\" : 40," + + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", " + "\"segmentCrc\" : \"" + + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 50" + "}]"; + validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() { + }); + compactionConfigs = getCompactionConfigs("30", "0"); + segmentSelectionResult = + UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + validDocIdsMetadataInfo); + Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 2); + Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 0); + assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), + _completedSegment.getSegmentName()); + assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(1).getSegmentName(), + _completedSegment2.getSegmentName()); } private Map<String, String> getCompactionConfigs(String invalidRecordsThresholdPercent, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org