sajjad-moradi commented on code in PR #15177: URL: https://github.com/apache/pinot/pull/15177#discussion_r1983771523
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java: ########## @@ -543,44 +545,45 @@ public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<Stri } @VisibleForTesting - static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, List<SegmentZKMetadata> allSegments) { - if (tableType == TableType.REALTIME) { - // For realtime table, don't process - // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS) - // 2. sealed segments with start time later than the earliest start time of all in progress segments - // This prevents those in-progress segments from not being merged. - // - // Note that we make the following two assumptions here: - // 1. streaming data consumer lags are negligible - // 2. streaming data records are ingested mostly in chronological order (no records are ingested with delay larger - // than bufferTimeMS) - // - // We don't handle the following cases intentionally because it will be either overkill or too complex - // 1. New partition added. If new partitions are not picked up timely, the MergeRollupTask will move watermarks - // forward, and may not be able to merge some lately-created segments for those new partitions -- users should - // configure pinot properly to discover new partitions timely, or they should restart pinot servers manually - // for new partitions to be picked up - // 2. (1) no new in-progress segments are created for some partitions (2) new in-progress segments are created for - // partitions, but there is no record consumed (i.e, empty in-progress segments). In those two cases, - // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may - // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those - // records correctly. - long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE; - for (SegmentZKMetadata segmentZKMetadata : allSegments) { - if (!segmentZKMetadata.getStatus().isCompleted() - && segmentZKMetadata.getTotalDocs() > 0 - && segmentZKMetadata.getStartTimeMs() < earliestStartTimeMsOfInProgressSegments) { - earliestStartTimeMsOfInProgressSegments = segmentZKMetadata.getStartTimeMs(); - } + static List<SegmentZKMetadata> filterSegmentsforRealtimeTable(List<SegmentZKMetadata> allSegments) { + // For realtime table, don't process + // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS), this has been taken care of in + // getNonConsumingSegmentsZKMetadataForRealtimeTable() + // 2. most recent sealed segments in each partition, this prevents those paused segments from being merged. + // + // Note that we make the following two assumptions here: + // 1. streaming data consumer lags are negligible + // 2. streaming data records are ingested mostly in chronological order (no records are ingested with delay larger + // than bufferTimeMS) + // + // We don't handle the following cases intentionally because it will be either overkill or too complex + // 1. New partition added. If new partitions are not picked up timely, the MergeRollupTask will move watermarks + // forward, and may not be able to merge some lately-created segments for those new partitions -- users should + // configure pinot properly to discover new partitions timely, or they should restart pinot servers manually + // for new partitions to be picked up + // 2. (1) no new in-progress segments are created for some partitions (2) new in-progress segments are created for + // partitions, but there is no record consumed (i.e, empty in-progress segments). In those two cases, + // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may + // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those + // records correctly. + Map<Integer, String> partitionIdToLatestCompletedSegment = new HashMap<>(); + for (SegmentZKMetadata segmentZKMetadata : allSegments) { + String segmentName = segmentZKMetadata.getSegmentName(); + if (LLCSegmentName.isLLCSegment(segmentName)) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + partitionIdToLatestCompletedSegment.compute(llcSegmentName.getPartitionGroupId(), (partId, latestSegment) -> { + if (latestSegment == null) { + return segmentName; + } else { + return new LLCSegmentName(latestSegment).getSequenceNumber() > llcSegmentName.getSequenceNumber() + ? latestSegment : segmentName; + } + }); } - final long finalEarliestStartTimeMsOfInProgressSegments = earliestStartTimeMsOfInProgressSegments; - return allSegments.stream() - .filter(segmentZKMetadata -> segmentZKMetadata.getStatus().isCompleted() - && segmentZKMetadata.getStartTimeMs() < finalEarliestStartTimeMsOfInProgressSegments) - .collect(Collectors.toList()); - } else { - return allSegments; } + return allSegments.stream() + .filter(a -> !partitionIdToLatestCompletedSegment.containsValue(a.getSegmentName())) Review Comment: I believe `.containsValue` is O(N). Could you please put the latests completed segments in a hashmap right before the return statement? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java: ########## @@ -283,6 +282,46 @@ public void testGenerateTasksCheckConfigs() { assertTrue(pinotTaskConfigs.isEmpty()); } + /** + * Test pre-filter of task generation + */ + @Test + public void testFilterSegmentsforRealtimeTable() { + ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); + + when(mockClusterInfoProvide.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE)).thenReturn(new HashMap<>()); + // construct 3 following segments, among these, only 0_0 can be scheduled, others should be filtered out + // partition 0, completed 0 + SegmentZKMetadata realtimeTableSegmentMetadata1 = + getSegmentZKMetadata("testTable__0__0__0", 5000, 6000, TimeUnit.MILLISECONDS, Review Comment: nit: `s/testTable__0__0__0/testTable__0__0__20250224T0900Z/` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org