This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a39ad22d7a Change Pre-filter logic in MergeRollup task (#15177) a39ad22d7a is described below commit a39ad22d7ae8489f486c359bdd2656a4a8585adb Author: Quan Yuan (Kate) <110874306+kate...@users.noreply.github.com> AuthorDate: Fri Mar 7 13:06:24 2025 -0800 Change Pre-filter logic in MergeRollup task (#15177) --- .../MergeRollupMinionClusterIntegrationTest.java | 4 +- .../mergerollup/MergeRollupTaskGenerator.java | 91 ++++++++++++---------- .../mergerollup/MergeRollupTaskGeneratorTest.java | 62 +++++++++++++-- 3 files changed, 108 insertions(+), 49 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java index 71a55da67d..e7b51c6064 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java @@ -1031,8 +1031,8 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat String sqlQuery = "SELECT count(*) FROM " + tableName; JsonNode expectedJson = postQuery(sqlQuery); - long[] expectedNumBucketsToProcess100Days = {3, 2, 1, 0, 3, 2, 1, 0}; - long[] expectedNumBucketsToProcess200Days = {0, 0, 1, 1, 0, 0, 1, 1}; + long[] expectedNumBucketsToProcess100Days = {2, 1, 0, 0, 3, 2, 1, 0}; + long[] expectedNumBucketsToProcess200Days = {0, 0, 2, 1, 0, 0, 1, 1}; String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); int numTasks = 0; TaskSchedulingContext context = new TaskSchedulingContext() diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index 10dcfcbf66..c4f9bd2175 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -42,6 +42,7 @@ import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.minion.MergeRollupTaskMetadata; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; @@ -161,21 +162,22 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { LOGGER.info("Start generating task configs for table: {} for task: {}", tableNameWithType, taskType); // Get all segment metadata - List<SegmentZKMetadata> allSegments = getSegmentsZKMetadataForTable(tableNameWithType); - // Filter segments based on status - List<SegmentZKMetadata> preSelectedSegmentsBasedOnStatus - = filterSegmentsBasedOnStatus(tableConfig.getTableType(), allSegments); + List<SegmentZKMetadata> allSegments = + tableConfig.getTableType() == TableType.OFFLINE + ? getSegmentsZKMetadataForTable(tableNameWithType) + : filterSegmentsforRealtimeTable( + getNonConsumingSegmentsZKMetadataForRealtimeTable(tableNameWithType)); // Select current segment snapshot based on lineage, filter out empty segments SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(tableNameWithType); Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>(); - for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) { + for (SegmentZKMetadata segment : allSegments) { preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName()); } SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage); List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>(); - for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) { + for (SegmentZKMetadata segment : allSegments) { if (preSelectedSegmentsBasedOnLineage.contains(segment.getSegmentName()) && segment.getTotalDocs() > 0 && MergeTaskUtils.allowMerge(segment)) { preSelectedSegments.add(segment); @@ -543,44 +545,49 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { } @VisibleForTesting - static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, List<SegmentZKMetadata> allSegments) { - if (tableType == TableType.REALTIME) { - // For realtime table, don't process - // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS) - // 2. sealed segments with start time later than the earliest start time of all in progress segments - // This prevents those in-progress segments from not being merged. - // - // Note that we make the following two assumptions here: - // 1. streaming data consumer lags are negligible - // 2. streaming data records are ingested mostly in chronological order (no records are ingested with delay larger - // than bufferTimeMS) - // - // We don't handle the following cases intentionally because it will be either overkill or too complex - // 1. New partition added. If new partitions are not picked up timely, the MergeRollupTask will move watermarks - // forward, and may not be able to merge some lately-created segments for those new partitions -- users should - // configure pinot properly to discover new partitions timely, or they should restart pinot servers manually - // for new partitions to be picked up - // 2. (1) no new in-progress segments are created for some partitions (2) new in-progress segments are created for - // partitions, but there is no record consumed (i.e, empty in-progress segments). In those two cases, - // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may - // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those - // records correctly. - long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE; - for (SegmentZKMetadata segmentZKMetadata : allSegments) { - if (!segmentZKMetadata.getStatus().isCompleted() - && segmentZKMetadata.getTotalDocs() > 0 - && segmentZKMetadata.getStartTimeMs() < earliestStartTimeMsOfInProgressSegments) { - earliestStartTimeMsOfInProgressSegments = segmentZKMetadata.getStartTimeMs(); - } + static List<SegmentZKMetadata> filterSegmentsforRealtimeTable(List<SegmentZKMetadata> allSegments) { + // For realtime table, don't process + // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS), this has been taken care of in + // getNonConsumingSegmentsZKMetadataForRealtimeTable() + // 2. most recent sealed segments in each partition, this prevents those paused segments from being merged. + // + // Note that we make the following two assumptions here: + // 1. streaming data consumer lags are negligible + // 2. streaming data records are ingested mostly in chronological order (no records are ingested with delay larger + // than bufferTimeMS) + // + // We don't handle the following cases intentionally because it will be either overkill or too complex + // 1. New partition added. If new partitions are not picked up timely, the MergeRollupTask will move watermarks + // forward, and may not be able to merge some lately-created segments for those new partitions -- users should + // configure pinot properly to discover new partitions timely, or they should restart pinot servers manually + // for new partitions to be picked up + // 2. (1) no new in-progress segments are created for some partitions (2) new in-progress segments are created for + // partitions, but there is no record consumed (i.e, empty in-progress segments). In those two cases, + // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may + // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those + // records correctly. + Map<Integer, LLCSegmentName> partitionIdToLatestCompletedSegment = new HashMap<>(); + for (SegmentZKMetadata segmentZKMetadata : allSegments) { + String segmentName = segmentZKMetadata.getSegmentName(); + if (LLCSegmentName.isLLCSegment(segmentName)) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + partitionIdToLatestCompletedSegment.compute(llcSegmentName.getPartitionGroupId(), (partId, latestSegment) -> { + if (latestSegment == null) { + return llcSegmentName; + } else { + return latestSegment.getSequenceNumber() > llcSegmentName.getSequenceNumber() + ? latestSegment : llcSegmentName; + } + }); } - final long finalEarliestStartTimeMsOfInProgressSegments = earliestStartTimeMsOfInProgressSegments; - return allSegments.stream() - .filter(segmentZKMetadata -> segmentZKMetadata.getStatus().isCompleted() - && segmentZKMetadata.getStartTimeMs() < finalEarliestStartTimeMsOfInProgressSegments) - .collect(Collectors.toList()); - } else { - return allSegments; } + Set<String> filteredSegmentNames = new HashSet<>(); + for (LLCSegmentName llcSegmentName : partitionIdToLatestCompletedSegment.values()) { + filteredSegmentNames.add(llcSegmentName.getSegmentName()); + } + return allSegments.stream() + .filter(a -> !filteredSegmentNames.contains(a.getSegmentName())) + .collect(Collectors.toList()); } /** diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java index 18d60d0f9b..7665e7308f 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java @@ -246,7 +246,7 @@ public class MergeRollupTaskGeneratorTest { // the two following segments will be skipped when generating tasks SegmentZKMetadata realtimeTableSegmentMetadata1 = getSegmentZKMetadata("testTable__0__0__0", 5000, 50_000, TimeUnit.MILLISECONDS, null); - realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); + realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE); SegmentZKMetadata realtimeTableSegmentMetadata2 = getSegmentZKMetadata("testTable__1__0__0", 5000, 50_000, TimeUnit.MILLISECONDS, null); when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( @@ -266,15 +266,14 @@ public class MergeRollupTaskGeneratorTest { // Skip task generation, if the table is a realtime table and all segments are skipped // We don't test realtime REFRESH table because this combination does not make sense - assertTrue(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.REALTIME, - Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2)).isEmpty()); + assertTrue(MergeRollupTaskGenerator.filterSegmentsforRealtimeTable( + Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2) + ).isEmpty()); TableConfig realtimeTableConfig = getTableConfig(TableType.REALTIME, new HashMap<>()); List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig)); assertTrue(pinotTaskConfigs.isEmpty()); // Skip task generation, if the table is an offline REFRESH table - assertFalse(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.OFFLINE, - Lists.newArrayList(offlineTableSegmentMetadata)).isEmpty()); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, "REFRESH", null)); TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, new HashMap<>()); @@ -283,6 +282,46 @@ public class MergeRollupTaskGeneratorTest { assertTrue(pinotTaskConfigs.isEmpty()); } + /** + * Test pre-filter of task generation + */ + @Test + public void testFilterSegmentsforRealtimeTable() { + ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); + + when(mockClusterInfoProvide.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE)).thenReturn(new HashMap<>()); + // construct 3 following segments, among these, only 0_0 can be scheduled, others should be filtered out + // partition 0, completed 0 + SegmentZKMetadata realtimeTableSegmentMetadata1 = + getSegmentZKMetadata("testTable__0__0__20250224T0900Z", 5000, 6000, TimeUnit.MILLISECONDS, + null, "50000", "60000"); + realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + // partition 0, completed 1 + SegmentZKMetadata realtimeTableSegmentMetadata2 = + getSegmentZKMetadata("testTable__0__1__20250224T0902Z", 6000, 7000, TimeUnit.MILLISECONDS, + null, "60000", "70000"); + realtimeTableSegmentMetadata2.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + // partition 1, completed 0 + SegmentZKMetadata realtimeTableSegmentMetadata3 = + getSegmentZKMetadata("testTable__1__0__20250224T0900Z", 5500, 6500, TimeUnit.MILLISECONDS, + null, "55000", "65000"); + realtimeTableSegmentMetadata3.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( + Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2, + realtimeTableSegmentMetadata3)); + when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn( + getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList("testTable__0", "server0", "ONLINE"))); + + MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); + generator.init(mockClusterInfoProvide); + + List<SegmentZKMetadata> filterResult = MergeRollupTaskGenerator.filterSegmentsforRealtimeTable( + Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2, + realtimeTableSegmentMetadata3)); + assertEquals(filterResult.size(), 1); + assertEquals(filterResult.get(0).getSegmentName(), "testTable__0__0__20250224T0900Z"); + } + private void checkPinotTaskConfig(Map<String, String> pinotTaskConfig, String segments, String mergeLevel, String mergeType, String partitionBucketTimePeriod, String roundBucketTimePeriod, String maxNumRecordsPerSegments) { @@ -1034,6 +1073,19 @@ public class MergeRollupTaskGeneratorTest { return segmentZKMetadata; } + private SegmentZKMetadata getSegmentZKMetadata(String segmentName, long startTime, long endTime, TimeUnit timeUnit, + String downloadURL, String startOffset, String endOffset) { + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName); + segmentZKMetadata.setStartTime(startTime); + segmentZKMetadata.setEndTime(endTime); + segmentZKMetadata.setTimeUnit(timeUnit); + segmentZKMetadata.setDownloadUrl(downloadURL); + segmentZKMetadata.setTotalDocs(1000); + segmentZKMetadata.setStartOffset(startOffset); + segmentZKMetadata.setEndOffset(endOffset); + return segmentZKMetadata; + } + private IdealState getIdealState(String tableName, List<String> segmentNames) { IdealState idealState = new IdealState(tableName); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org