snleee commented on code in PR #9890: URL: https://github.com/apache/pinot/pull/9890#discussion_r1043864051
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java: ########## @@ -430,22 +448,70 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { return pinotTaskConfigs; } + @VisibleForTesting + static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, List<SegmentZKMetadata> allSegments) { + if (tableType == TableType.REALTIME) { + // For realtime table, don't process + // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS) + // 2. sealed segments with start time later than the earliest start time of all in progress segments + // This prevents those in-progress segments from not being merged. + // + // Note that we make the following two assumptions here: + // 1. streaming data consumer lags are negligible + // 2. streaming data records are ingested mostly in chronological order (no records are ingested with delay larger + // than bufferTimeMS) + // + // We don't handle the following cases intentionally because it will be either overkill or too complex + // 1. New partition added. If new partitions are not picked up timely, the MergeRollupTask will move watermarks + // forward, and may not be able to merge some lately-created segments for those new partitions -- users should + // configure pinot properly to discover new partitions timely, or they should restart pinot servers manually + // for new partitions to be picked up + // 2. (1) no new in-progress segments are created for some partitions (2) new in-progress segments are created for + // partitions, but there is no record consumed (i.e, empty in-progress segments). In those two cases, + // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may + // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those + // records correctly. + long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE; + for (SegmentZKMetadata segmentZKMetadata : allSegments) { + if (!segmentZKMetadata.getStatus().isCompleted() + && segmentZKMetadata.getTotalDocs() > 0 + && segmentZKMetadata.getStartTimeMs() < earliestStartTimeMsOfInProgressSegments) { + earliestStartTimeMsOfInProgressSegments = segmentZKMetadata.getStartTimeMs(); + } + } + final long finalEarliestStartTimeMsOfInProgressSegments = earliestStartTimeMsOfInProgressSegments; + return allSegments.stream() + .filter(segmentZKMetadata -> segmentZKMetadata.getStatus().isCompleted() + && segmentZKMetadata.getStartTimeMs() < finalEarliestStartTimeMsOfInProgressSegments) + .collect(Collectors.toList()); + } else { + return allSegments; + } + } + /** * Validate table config for merge/rollup task */ - private boolean validate(TableConfig tableConfig, String taskType) { - String offlineTableName = tableConfig.getTableName(); - if (tableConfig.getTableType() != TableType.OFFLINE) { - LOGGER.warn("Skip generating task: {} for non-OFFLINE table: {}, REALTIME table is not supported yet", taskType, - offlineTableName); - return false; - } - + @VisibleForTesting + static boolean validate(TableConfig tableConfig, String taskType) { + String tableNameWithType = tableConfig.getTableName(); if (REFRESH.equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig))) { LOGGER.warn("Skip generating task: {} for non-APPEND table: {}, REFRESH table is not supported", taskType, - offlineTableName); + tableNameWithType); return false; } + if (tableConfig.getTableType() == TableType.REALTIME) { + if (tableConfig.isUpsertEnabled()) { Review Comment: +1 Thanks for adding this ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java: ########## @@ -65,13 +65,14 @@ /** - * Integration test for minion task of type "MergeRollupTask" + * Integration test for minion task of type "MergeRollupTask" configured on offline tables Review Comment: Update the comment to remove `offline tables` since we combined the test :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org