swaminathanmanish commented on code in PR #14623: URL: https://github.com/apache/pinot/pull/14623#discussion_r1900822037
########## pinot-common/src/main/java/org/apache/pinot/common/minion/ExpectedSubtaskResult.java: ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.minion; + +import java.util.List; +import java.util.Objects; +import java.util.UUID; + + +/** + * ExpectedRealtimeOfflineTaskResult is created in + * {@link org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor} + * before uploading offline segment(s) to the offline table. + * + * The <code>_segmentsFrom</code> denotes the input RealtimeSegments. + * The <code>_segmentsTo</code> denotes the expected offline segemnts. + * The <code>_id</code> denotes the unique identifier of object. + * The <code>_taskID</code> denotes the minion taskId. + * The <code>_taskFailure</code> denotes the failure status of minion task handling the + * current ExpectedResult. This is modified in + * {@link org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator} + * when a prev minion task is failed. + * + */ +public class ExpectedSubtaskResult { Review Comment: Is this applicable for other tasks. If not, I'd suggest moving this data structure to a RTO specific module. ########## pinot-common/src/main/java/org/apache/pinot/common/minion/ExpectedSubtaskResult.java: ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.minion; + +import java.util.List; +import java.util.Objects; +import java.util.UUID; + + +/** + * ExpectedRealtimeOfflineTaskResult is created in + * {@link org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor} + * before uploading offline segment(s) to the offline table. + * + * The <code>_segmentsFrom</code> denotes the input RealtimeSegments. + * The <code>_segmentsTo</code> denotes the expected offline segemnts. + * The <code>_id</code> denotes the unique identifier of object. + * The <code>_taskID</code> denotes the minion taskId. + * The <code>_taskFailure</code> denotes the failure status of minion task handling the + * current ExpectedResult. This is modified in + * {@link org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator} + * when a prev minion task is failed. + * + */ +public class ExpectedSubtaskResult { + private final List<String> _segmentsFrom; + private final List<String> _segmentsTo; + private final String _id; + private final String _taskID; + private boolean _taskFailure = false; + + public ExpectedSubtaskResult(List<String> segmentsFrom, List<String> segmentsTo, String taskID) { + _segmentsFrom = segmentsFrom; + _segmentsTo = segmentsTo; + _taskID = taskID; + _id = UUID.randomUUID().toString(); + } + + public ExpectedSubtaskResult(List<String> segmentsFrom, List<String> segmentsTo, + String id, String taskID, boolean taskFailure) { + _segmentsFrom = segmentsFrom; + _segmentsTo = segmentsTo; + _id = id; + _taskID = taskID; + _taskFailure = taskFailure; + } + + public String getTaskID() { Review Comment: Can this itself be a uniqueId instead of new _id field? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java: ########## @@ -194,19 +205,54 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, /** * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table. - * Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update - * watermark in the ZNode - * TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot see another way to do it + * Before uploading the segments, updates the metadata with the expected results + * of the successful execution of current subtask. + * The expected result updated in metadata is read by the next iteration of Task Generator + * to ensure data correctness. */ @Override - public void postProcess(PinotTaskConfig pinotTaskConfig) { - Map<String, String> configs = pinotTaskConfig.getConfigs(); - String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY); - long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY)); - RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata = - new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs); - _minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, RealtimeToOfflineSegmentsTask.TASK_TYPE, - _expectedVersion); + protected void preUploadSegments(SegmentUploadContext context) + throws Exception { + super.preUploadSegments(context); + String realtimeTableName = context.getTableNameWithType(); + int attemptCount; + try { + attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> { + try { + ZNRecord realtimeToOfflineSegmentsTaskZNRecord = + _minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName, + RealtimeToOfflineSegmentsTask.TASK_TYPE); + int expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion(); + + // Adding ExpectedRealtimeToOfflineSegmentsTaskResult might fail. + // In-case of failure there will be runtime exception thrown + RealtimeToOfflineSegmentsTaskMetadata updatedRealtimeToOfflineSegmentsTaskMetadata = + getUpdatedTaskMetadata(context, realtimeToOfflineSegmentsTaskZNRecord); + + // Setting to zookeeper might fail due to version mismatch, but in this case + // the exception is caught and retried. + _minionTaskZkMetadataManager.setTaskMetadataZNRecord(updatedRealtimeToOfflineSegmentsTaskMetadata, + RealtimeToOfflineSegmentsTask.TASK_TYPE, + expectedVersion); + return true; + } catch (ZkBadVersionException e) { + LOGGER.info( + "Version changed while updating num of subtasks left in RTO task metadata for table: {}, Retrying.", + realtimeTableName); + return false; + } + }); + } catch (Exception e) { + String errorMsg = + String.format("Failed to update the RealtimeToOfflineSegmentsTaskMetadata during preUploadSegments. " Review Comment: Can you enhance this "failed to update to Zk after N attempts" or something so that we know from the log its a Zk update failure. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -147,97 +157,385 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod); long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod); - // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = - // windowStart + bucket. - long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs); - long windowEndMs = windowStartMs + bucketMs; + ZNRecord realtimeToOfflineZNRecord = + _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + realtimeTableName); + int expectedVersion = realtimeToOfflineZNRecord != null ? realtimeToOfflineZNRecord.getVersion() : -1; + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + getRTOTaskMetadata(realtimeTableName, completedRealtimeSegmentsZKMetadata, bucketMs, + realtimeToOfflineZNRecord); + + // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. + long windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd // (exclusive) - List<String> segmentNames = new ArrayList<>(); - List<String> downloadURLs = new ArrayList<>(); Set<String> lastLLCSegmentPerPartition = new HashSet<>(partitionToLatestLLCSegmentName.values()); - boolean skipGenerate = false; - while (true) { - // Check that execution window is older than bufferTime - if (windowEndMs > System.currentTimeMillis() - bufferMs) { - LOGGER.info( - "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task " - + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType); - skipGenerate = true; - break; + + // Get all offline table segments. + // These are used to validate if previous minion task was successful or not + String offlineTableName = + TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName)); + Set<String> existingOfflineTableSegmentNames = + new HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName, true)); + + // In-case of previous minion task failures, get info + // of failed minion subtasks. They need to be reprocessed. + Set<String> failedTaskInputSegments = + getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, existingOfflineTableSegmentNames); + + // In-case of partial failure of segments upload in prev minion task run, + // data is inconsistent, delete the corresponding offline segments immediately. + if (!failedTaskInputSegments.isEmpty()) { + deleteInvalidOfflineSegments(offlineTableName, failedTaskInputSegments, existingOfflineTableSegmentNames, + realtimeToOfflineSegmentsTaskMetadata); + } + + List<SegmentZKMetadata> segmentsToBeReProcessed = + filterOutRemovedSegments(failedTaskInputSegments, completedRealtimeSegmentsZKMetadata); + + // if no segment to be reprocessed, no failure + boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty(); + + List<List<String>> segmentNamesGroupList = new ArrayList<>(); + Map<String, String> segmentNameVsDownloadURL = new HashMap<>(); + + // maxNumRecordsPerTask is used to divide a minion tasks among + // multiple subtasks to improve performance. + int maxNumRecordsPerTask = + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null + ? Integer.parseInt( + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)) + : DEFAULT_MAX_NUM_RECORDS_PER_TASK; + + List<SegmentZKMetadata> segmentsToBeScheduled; + + if (!prevMinionTaskSuccessful) { + segmentsToBeScheduled = segmentsToBeReProcessed; + } else { + // if all offline segments of prev minion tasks were successfully uploaded, + // we can clear the state of prev minion tasks as now it's useless. + if (!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(). + isEmpty()) { + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear(); + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear(); + // windowEndTime of prev minion task needs to be re-used for picking up the + // next windowStartTime. This is useful for case where user changes minion config + // after a minion task run was complete. So windowStartTime cannot be watermark + bucketMs + windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); } + long windowEndMs = windowStartMs + bucketMs; + // since window changed, pick new segments. + segmentsToBeScheduled = + generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, windowStartMs, windowEndMs, bucketMs, + bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, realtimeToOfflineSegmentsTaskMetadata); + } - for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { - String segmentName = segmentZKMetadata.getSegmentName(); - long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs(); - long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs(); - - // Check overlap with window - if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) { - // If last completed segment is being used, make sure that segment crosses over end of window. - // In the absence of this check, CONSUMING segments could contain some portion of the window. That data - // would be skipped forever. - if (lastLLCSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) { - LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task " - + "generation: {}", segmentName, taskType); - skipGenerate = true; - break; - } - segmentNames.add(segmentName); - downloadURLs.add(segmentZKMetadata.getDownloadUrl()); + divideSegmentsAmongSubtasks(segmentsToBeScheduled, segmentNamesGroupList, segmentNameVsDownloadURL, + maxNumRecordsPerTask); + + if (segmentNamesGroupList.isEmpty()) { + continue; + } + + List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>(); + long newWindowStartTime = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); + long newWindowEndTime = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); + + LOGGER.info( + "generating tasks for: {} with window start time: {}, window end time: {}, table: {}", taskType, + windowStartMs, + newWindowEndTime, realtimeTableName); + + for (List<String> segmentNameList : segmentNamesGroupList) { + List<String> downloadURLList = getDownloadURLList(segmentNameList, segmentNameVsDownloadURL); + Preconditions.checkState(segmentNameList.size() == downloadURLList.size()); + pinotTaskConfigsForTable.add( + createPinotTaskConfig(segmentNameList, downloadURLList, realtimeTableName, taskConfigs, tableConfig, + newWindowStartTime, + newWindowEndTime, taskType)); + } + try { + _clusterInfoAccessor + .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata, + MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + expectedVersion); + } catch (ZkBadVersionException e) { + LOGGER.error( + "Version changed while updating RTO task metadata for table: {}, skip scheduling. There are " + + "multiple task schedulers for the same table, need to investigate!", realtimeTableName); + // skip this table for this minion run + continue; + } + + pinotTaskConfigs.addAll(pinotTaskConfigsForTable); + + LOGGER.info("Finished generating task configs for table: {} for task: {}", realtimeTableName, taskType); + } + return pinotTaskConfigs; + } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, + "RealtimeToOfflineTask doesn't support upsert table!"); + // check no malformed period + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s")); + // check mergeType is correct + Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name()) + .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name()) + .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); + + Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig); + // check no mis-configured columns + Set<String> columnNames = schema.getColumnNames(); + for (Map.Entry<String, String> entry : taskConfigs.entrySet()) { + if (entry.getKey().endsWith(".aggregationType")) { + Preconditions.checkState(columnNames.contains( + StringUtils.removeEnd(entry.getKey(), RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)), + String.format("Column \"%s\" not found in schema!", entry.getKey())); + try { + // check that it's a valid aggregation function type + AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue()); + // check that a value aggregator is available + if (!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) { + throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString()); } + } catch (IllegalArgumentException e) { + String err = + String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue()); + throw new IllegalStateException(err); } - if (skipGenerate || !segmentNames.isEmpty()) { - break; - } + } + } + } - LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket", - taskType, windowStartMs, windowEndMs); - windowStartMs = windowEndMs; - windowEndMs += bucketMs; + private List<String> getDownloadURLList(List<String> segmentNameList, Map<String, String> segmentNameVsDownloadURL) { + List<String> downloadURLList = new ArrayList<>(); + for (String segmentName : segmentNameList) { + downloadURLList.add(segmentNameVsDownloadURL.get(segmentName)); + } + return downloadURLList; + } + + private void deleteInvalidOfflineSegments(String offlineTableName, + Set<String> realtimeSegmentsToBeReProcessed, + Set<String> existingOfflineTableSegmentNames, + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata) { + + Map<String, String> segmentNameToExpectedSubtaskResultID = + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(); + Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap = + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap(); + + Set<String> segmentsToBeDeleted = new HashSet<>(); + + for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) { + String id = segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName); + Preconditions.checkNotNull(id); + ExpectedSubtaskResult expectedSubtaskResult = + expectedSubtaskResultMap.get(id); + // if already marked as failure, no need to delete again. + if (expectedSubtaskResult.isTaskFailure()) { + continue; } + List<String> expectedCorrespondingOfflineSegments = expectedSubtaskResult.getSegmentsTo(); + segmentsToBeDeleted.addAll( Review Comment: Add log.info, to indicate the number of segments that needs to deleted, if not the segment names (if its a long list). ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -147,97 +157,385 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod); long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod); - // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = - // windowStart + bucket. - long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs); - long windowEndMs = windowStartMs + bucketMs; + ZNRecord realtimeToOfflineZNRecord = + _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + realtimeTableName); + int expectedVersion = realtimeToOfflineZNRecord != null ? realtimeToOfflineZNRecord.getVersion() : -1; + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + getRTOTaskMetadata(realtimeTableName, completedRealtimeSegmentsZKMetadata, bucketMs, + realtimeToOfflineZNRecord); + + // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. + long windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd // (exclusive) - List<String> segmentNames = new ArrayList<>(); - List<String> downloadURLs = new ArrayList<>(); Set<String> lastLLCSegmentPerPartition = new HashSet<>(partitionToLatestLLCSegmentName.values()); - boolean skipGenerate = false; - while (true) { - // Check that execution window is older than bufferTime - if (windowEndMs > System.currentTimeMillis() - bufferMs) { - LOGGER.info( - "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task " - + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType); - skipGenerate = true; - break; + + // Get all offline table segments. + // These are used to validate if previous minion task was successful or not + String offlineTableName = + TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName)); + Set<String> existingOfflineTableSegmentNames = + new HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName, true)); + + // In-case of previous minion task failures, get info + // of failed minion subtasks. They need to be reprocessed. + Set<String> failedTaskInputSegments = + getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, existingOfflineTableSegmentNames); + + // In-case of partial failure of segments upload in prev minion task run, + // data is inconsistent, delete the corresponding offline segments immediately. + if (!failedTaskInputSegments.isEmpty()) { + deleteInvalidOfflineSegments(offlineTableName, failedTaskInputSegments, existingOfflineTableSegmentNames, + realtimeToOfflineSegmentsTaskMetadata); + } + + List<SegmentZKMetadata> segmentsToBeReProcessed = + filterOutRemovedSegments(failedTaskInputSegments, completedRealtimeSegmentsZKMetadata); + + // if no segment to be reprocessed, no failure + boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty(); + + List<List<String>> segmentNamesGroupList = new ArrayList<>(); + Map<String, String> segmentNameVsDownloadURL = new HashMap<>(); + + // maxNumRecordsPerTask is used to divide a minion tasks among + // multiple subtasks to improve performance. + int maxNumRecordsPerTask = + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null + ? Integer.parseInt( + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)) + : DEFAULT_MAX_NUM_RECORDS_PER_TASK; + + List<SegmentZKMetadata> segmentsToBeScheduled; + + if (!prevMinionTaskSuccessful) { + segmentsToBeScheduled = segmentsToBeReProcessed; + } else { + // if all offline segments of prev minion tasks were successfully uploaded, + // we can clear the state of prev minion tasks as now it's useless. + if (!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(). + isEmpty()) { + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear(); Review Comment: This is idempotent and should be fine upon re-execution? (i.e if there's failure prior to updating this state in Zk) ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -147,97 +157,385 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod); long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod); - // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = - // windowStart + bucket. - long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs); - long windowEndMs = windowStartMs + bucketMs; + ZNRecord realtimeToOfflineZNRecord = + _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + realtimeTableName); + int expectedVersion = realtimeToOfflineZNRecord != null ? realtimeToOfflineZNRecord.getVersion() : -1; + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + getRTOTaskMetadata(realtimeTableName, completedRealtimeSegmentsZKMetadata, bucketMs, + realtimeToOfflineZNRecord); + + // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. + long windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd // (exclusive) - List<String> segmentNames = new ArrayList<>(); - List<String> downloadURLs = new ArrayList<>(); Set<String> lastLLCSegmentPerPartition = new HashSet<>(partitionToLatestLLCSegmentName.values()); - boolean skipGenerate = false; - while (true) { - // Check that execution window is older than bufferTime - if (windowEndMs > System.currentTimeMillis() - bufferMs) { - LOGGER.info( - "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task " - + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType); - skipGenerate = true; - break; + + // Get all offline table segments. + // These are used to validate if previous minion task was successful or not + String offlineTableName = + TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName)); + Set<String> existingOfflineTableSegmentNames = + new HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName, true)); + + // In-case of previous minion task failures, get info + // of failed minion subtasks. They need to be reprocessed. + Set<String> failedTaskInputSegments = + getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, existingOfflineTableSegmentNames); + + // In-case of partial failure of segments upload in prev minion task run, + // data is inconsistent, delete the corresponding offline segments immediately. + if (!failedTaskInputSegments.isEmpty()) { + deleteInvalidOfflineSegments(offlineTableName, failedTaskInputSegments, existingOfflineTableSegmentNames, + realtimeToOfflineSegmentsTaskMetadata); + } + + List<SegmentZKMetadata> segmentsToBeReProcessed = + filterOutRemovedSegments(failedTaskInputSegments, completedRealtimeSegmentsZKMetadata); + + // if no segment to be reprocessed, no failure + boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty(); + + List<List<String>> segmentNamesGroupList = new ArrayList<>(); + Map<String, String> segmentNameVsDownloadURL = new HashMap<>(); + + // maxNumRecordsPerTask is used to divide a minion tasks among + // multiple subtasks to improve performance. + int maxNumRecordsPerTask = + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null + ? Integer.parseInt( + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)) + : DEFAULT_MAX_NUM_RECORDS_PER_TASK; + + List<SegmentZKMetadata> segmentsToBeScheduled; + + if (!prevMinionTaskSuccessful) { + segmentsToBeScheduled = segmentsToBeReProcessed; + } else { + // if all offline segments of prev minion tasks were successfully uploaded, + // we can clear the state of prev minion tasks as now it's useless. + if (!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(). + isEmpty()) { + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear(); + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear(); + // windowEndTime of prev minion task needs to be re-used for picking up the + // next windowStartTime. This is useful for case where user changes minion config + // after a minion task run was complete. So windowStartTime cannot be watermark + bucketMs + windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); } + long windowEndMs = windowStartMs + bucketMs; + // since window changed, pick new segments. + segmentsToBeScheduled = + generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, windowStartMs, windowEndMs, bucketMs, + bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, realtimeToOfflineSegmentsTaskMetadata); + } - for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { - String segmentName = segmentZKMetadata.getSegmentName(); - long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs(); - long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs(); - - // Check overlap with window - if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) { - // If last completed segment is being used, make sure that segment crosses over end of window. - // In the absence of this check, CONSUMING segments could contain some portion of the window. That data - // would be skipped forever. - if (lastLLCSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) { - LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task " - + "generation: {}", segmentName, taskType); - skipGenerate = true; - break; - } - segmentNames.add(segmentName); - downloadURLs.add(segmentZKMetadata.getDownloadUrl()); + divideSegmentsAmongSubtasks(segmentsToBeScheduled, segmentNamesGroupList, segmentNameVsDownloadURL, + maxNumRecordsPerTask); + + if (segmentNamesGroupList.isEmpty()) { + continue; + } + + List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>(); + long newWindowStartTime = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); + long newWindowEndTime = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); + + LOGGER.info( + "generating tasks for: {} with window start time: {}, window end time: {}, table: {}", taskType, + windowStartMs, + newWindowEndTime, realtimeTableName); + + for (List<String> segmentNameList : segmentNamesGroupList) { + List<String> downloadURLList = getDownloadURLList(segmentNameList, segmentNameVsDownloadURL); + Preconditions.checkState(segmentNameList.size() == downloadURLList.size()); + pinotTaskConfigsForTable.add( + createPinotTaskConfig(segmentNameList, downloadURLList, realtimeTableName, taskConfigs, tableConfig, + newWindowStartTime, + newWindowEndTime, taskType)); + } + try { + _clusterInfoAccessor + .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata, + MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + expectedVersion); + } catch (ZkBadVersionException e) { + LOGGER.error( + "Version changed while updating RTO task metadata for table: {}, skip scheduling. There are " + + "multiple task schedulers for the same table, need to investigate!", realtimeTableName); + // skip this table for this minion run + continue; + } + + pinotTaskConfigs.addAll(pinotTaskConfigsForTable); + + LOGGER.info("Finished generating task configs for table: {} for task: {}", realtimeTableName, taskType); + } + return pinotTaskConfigs; + } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, + "RealtimeToOfflineTask doesn't support upsert table!"); + // check no malformed period + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s")); + // check mergeType is correct + Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name()) + .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name()) + .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); + + Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig); + // check no mis-configured columns + Set<String> columnNames = schema.getColumnNames(); + for (Map.Entry<String, String> entry : taskConfigs.entrySet()) { + if (entry.getKey().endsWith(".aggregationType")) { + Preconditions.checkState(columnNames.contains( + StringUtils.removeEnd(entry.getKey(), RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)), + String.format("Column \"%s\" not found in schema!", entry.getKey())); + try { + // check that it's a valid aggregation function type + AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue()); + // check that a value aggregator is available + if (!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) { + throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString()); } + } catch (IllegalArgumentException e) { + String err = + String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue()); + throw new IllegalStateException(err); } - if (skipGenerate || !segmentNames.isEmpty()) { - break; - } + } + } + } - LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket", - taskType, windowStartMs, windowEndMs); - windowStartMs = windowEndMs; - windowEndMs += bucketMs; + private List<String> getDownloadURLList(List<String> segmentNameList, Map<String, String> segmentNameVsDownloadURL) { + List<String> downloadURLList = new ArrayList<>(); + for (String segmentName : segmentNameList) { + downloadURLList.add(segmentNameVsDownloadURL.get(segmentName)); + } + return downloadURLList; + } + + private void deleteInvalidOfflineSegments(String offlineTableName, + Set<String> realtimeSegmentsToBeReProcessed, + Set<String> existingOfflineTableSegmentNames, + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata) { + + Map<String, String> segmentNameToExpectedSubtaskResultID = + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(); + Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap = + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap(); + + Set<String> segmentsToBeDeleted = new HashSet<>(); + + for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) { + String id = segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName); + Preconditions.checkNotNull(id); + ExpectedSubtaskResult expectedSubtaskResult = + expectedSubtaskResultMap.get(id); + // if already marked as failure, no need to delete again. + if (expectedSubtaskResult.isTaskFailure()) { + continue; } + List<String> expectedCorrespondingOfflineSegments = expectedSubtaskResult.getSegmentsTo(); + segmentsToBeDeleted.addAll( + getSegmentsToDelete(expectedCorrespondingOfflineSegments, existingOfflineTableSegmentNames)); + // The expectedRealtimeToOfflineTaskResult is confirmed to be + // related to a failed task. Mark it as a failure, since executor will + // then only replace expectedRealtimeToOfflineTaskResult for the + // segments to be reprocessed. + expectedSubtaskResult.setTaskFailure(); + } + + if (!segmentsToBeDeleted.isEmpty()) { + _clusterInfoAccessor.getPinotHelixResourceManager() + .deleteSegments(offlineTableName, new ArrayList<>(segmentsToBeDeleted)); + } + } + + private Set<String> getFailedTaskSegments( + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata, + Set<String> existingOfflineTableSegmentNames) { + Set<String> failedIds = new HashSet<>(); + + // Get all the ExpectedRealtimeToOfflineTaskResult of prev minion task + Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap = + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap(); + Collection<ExpectedSubtaskResult> expectedSubtaskResultList = + expectedSubtaskResultMap.values(); + + Map<String, String> segmentNameToExpectedSubtaskResultID = + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(); + Set<String> expectedSubtaskResultIds = + new HashSet<>(segmentNameToExpectedSubtaskResultID.values()); + + Set<String> segmentNamesToReprocess = new HashSet<>(); - if (skipGenerate) { + // Check what all offline segments are present currently + for (ExpectedSubtaskResult expectedSubtaskResult + : expectedSubtaskResultList) { + + if (expectedSubtaskResult.isTaskFailure()) { + // if task is failure and is referenced by any segment, only then add to failed task. + if (expectedSubtaskResultIds.contains(expectedSubtaskResult.getId())) { + failedIds.add(expectedSubtaskResult.getId()); + } continue; } - Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs, - _clusterInfoAccessor); - configs.putAll(getBaseTaskConfigs(tableConfig, segmentNames)); - configs.put(MinionConstants.DOWNLOAD_URL_KEY, StringUtils.join(downloadURLs, MinionConstants.URL_SEPARATOR)); - configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); - - // Segment processor configs - configs.put(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, String.valueOf(windowStartMs)); - configs.put(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, String.valueOf(windowEndMs)); - String roundBucketTimePeriod = taskConfigs.get(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY); - if (roundBucketTimePeriod != null) { - configs.put(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, roundBucketTimePeriod); + // get offline segments + List<String> segmentTo = expectedSubtaskResult.getSegmentsTo(); + + // If not all corresponding offline segments to a realtime segment exists, + // it means there was an issue with prev minion task. And segment needs + // to be re-processed. + boolean taskSuccessful = checkIfAllSegmentsExists(segmentTo, existingOfflineTableSegmentNames); Review Comment: This is despite the taskFailure check "isTaskFailure" ? If thats the case, do we really need taskFailure status and instead just validate segmentsTo with the offline table segments, for existence? ########## pinot-common/src/main/java/org/apache/pinot/common/minion/ExpectedSubtaskResult.java: ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.minion; + +import java.util.List; +import java.util.Objects; +import java.util.UUID; + + +/** + * ExpectedRealtimeOfflineTaskResult is created in + * {@link org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor} + * before uploading offline segment(s) to the offline table. + * + * The <code>_segmentsFrom</code> denotes the input RealtimeSegments. + * The <code>_segmentsTo</code> denotes the expected offline segemnts. + * The <code>_id</code> denotes the unique identifier of object. + * The <code>_taskID</code> denotes the minion taskId. + * The <code>_taskFailure</code> denotes the failure status of minion task handling the + * current ExpectedResult. This is modified in + * {@link org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator} + * when a prev minion task is failed. + * + */ +public class ExpectedSubtaskResult { Review Comment: Can we name this something like RealtimeToOfflineCheckpoint or something ? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java: ########## @@ -156,6 +162,11 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, // Segment config segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs)); + // Since multiple subtasks run in parallel, there shouldn't be a name conflict. + // Append uuid + segmentProcessorConfigBuilder.setSegmentNameGenerator( Review Comment: I feel this should be left to the user to setup the naming scheme and we pick a default/generic naming scheme, instead of using a specific one. Can we not use the default name generator in SegmentProcessorFramework. Isn't time based normalized name generator used? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java: ########## @@ -99,12 +107,10 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) { RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord); long windowStartMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY)); - Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs() <= windowStartMs, + Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs() == windowStartMs, Review Comment: Nice stricter check ! Please confirm this. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -107,6 +115,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { LOGGER.info("Start generating task configs for table: {} for task: {}", realtimeTableName, taskType); // Only schedule 1 task of this type, per table + // Still there can be scenario where generator can generate additional task, while previous task + // is just about to be enqueued in the helix queue. Review Comment: Can you elaborate on this comment? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -147,97 +157,385 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod); long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod); - // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = - // windowStart + bucket. - long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs); - long windowEndMs = windowStartMs + bucketMs; + ZNRecord realtimeToOfflineZNRecord = + _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + realtimeTableName); + int expectedVersion = realtimeToOfflineZNRecord != null ? realtimeToOfflineZNRecord.getVersion() : -1; + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + getRTOTaskMetadata(realtimeTableName, completedRealtimeSegmentsZKMetadata, bucketMs, + realtimeToOfflineZNRecord); + + // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. + long windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd // (exclusive) - List<String> segmentNames = new ArrayList<>(); - List<String> downloadURLs = new ArrayList<>(); Set<String> lastLLCSegmentPerPartition = new HashSet<>(partitionToLatestLLCSegmentName.values()); - boolean skipGenerate = false; - while (true) { - // Check that execution window is older than bufferTime - if (windowEndMs > System.currentTimeMillis() - bufferMs) { - LOGGER.info( - "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task " - + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType); - skipGenerate = true; - break; + + // Get all offline table segments. Review Comment: Please add sufficient log.info (without flooding:)), that can help someone debug whats going on (specifically differentiating between happy and failure paths and when there's failure, what action took place/which segments had to be reprocessed or deleted... ########## pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java: ########## @@ -18,57 +18,183 @@ */ package org.apache.pinot.common.minion; +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.helix.zookeeper.datamodel.ZNRecord; /** - * Metadata for the minion task of type <code>RealtimeToOfflineSegmentsTask</code>. - * The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks have been executed. - * + * Metadata for the minion task of type <code>RealtimeToOfflineSegmentsTask</code>. The <code>_windowStartMs</code> + * denotes the time (exclusive) until which it's certain that tasks have been completed successfully. The + * <code>_expectedSubtaskResultMap</code> contains the expected RTO tasks result info. This map can contain both + * completed and in-completed Tasks expected Results. This map is used by generator to validate whether a potential + * segment (for RTO task) has already been successfully processed as a RTO task in the past or not. The + * <code>_windowStartMs</code> and <code>_windowEndMs</code> denote the window bucket time of currently not + * successfully completed minion task. bucket: [_windowStartMs, _windowEndMs) The window is updated by generator when + * it's certain that prev minon task run is successful. + * <p> * This gets serialized and stored in zookeeper under the path * MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask - * - * PinotTaskGenerator: - * The <code>watermarkMs</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>, - * to determine the window of execution for the task it is generating. - * The window of execution will be [watermarkMs, watermarkMs + bucketSize) - * - * PinotTaskExecutor: - * The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to: - * - Verify that is is running the latest task scheduled by the task generator - * - Update the watermark as the end of the window that it executed for + * <p> + * PinotTaskGenerator: The <code>_windowStartMs</code>> is used by the + * <code>RealtimeToOfflineSegmentsTaskGenerator</code>, to determine the window of execution of the prev task based on + * which it generates new task. + * <p> + * PinotTaskExecutor: The same windowStartMs is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to: + * - Verify that it's running the latest task scheduled by the task generator. + * - The _expectedSubtaskResultMap is updated before the offline segments are uploaded to the table. */ public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata { - private static final String WATERMARK_KEY = "watermarkMs"; + private static final String WINDOW_START_KEY = "watermarkMs"; + private static final String WINDOW_END_KEY = "windowEndMs"; + private static final String COMMA_SEPARATOR = ","; + private static final String SEGMENT_NAME_TO_EXPECTED_SUBTASK_RESULT_ID_KEY = "segmentToExpectedSubtaskResultId"; private final String _tableNameWithType; - private final long _watermarkMs; + private long _windowStartMs; + private long _windowEndMs; + private final Map<String, ExpectedSubtaskResult> _expectedSubtaskResultMap; + private final Map<String, String> _segmentNameToExpectedSubtaskResultID; Review Comment: Why do we need this Map? Can we not use the previous map itself to get the list of segmentsFrom and segmentsTo for a specific taskId? We can the construct this map in memory, from that right? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -147,97 +157,385 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod); long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod); - // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = - // windowStart + bucket. - long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs); - long windowEndMs = windowStartMs + bucketMs; + ZNRecord realtimeToOfflineZNRecord = + _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + realtimeTableName); + int expectedVersion = realtimeToOfflineZNRecord != null ? realtimeToOfflineZNRecord.getVersion() : -1; + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + getRTOTaskMetadata(realtimeTableName, completedRealtimeSegmentsZKMetadata, bucketMs, + realtimeToOfflineZNRecord); + + // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. + long windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd // (exclusive) - List<String> segmentNames = new ArrayList<>(); - List<String> downloadURLs = new ArrayList<>(); Set<String> lastLLCSegmentPerPartition = new HashSet<>(partitionToLatestLLCSegmentName.values()); - boolean skipGenerate = false; - while (true) { - // Check that execution window is older than bufferTime - if (windowEndMs > System.currentTimeMillis() - bufferMs) { - LOGGER.info( - "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task " - + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType); - skipGenerate = true; - break; + + // Get all offline table segments. + // These are used to validate if previous minion task was successful or not + String offlineTableName = + TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName)); + Set<String> existingOfflineTableSegmentNames = + new HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName, true)); + + // In-case of previous minion task failures, get info + // of failed minion subtasks. They need to be reprocessed. + Set<String> failedTaskInputSegments = + getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, existingOfflineTableSegmentNames); + + // In-case of partial failure of segments upload in prev minion task run, + // data is inconsistent, delete the corresponding offline segments immediately. + if (!failedTaskInputSegments.isEmpty()) { + deleteInvalidOfflineSegments(offlineTableName, failedTaskInputSegments, existingOfflineTableSegmentNames, + realtimeToOfflineSegmentsTaskMetadata); + } + + List<SegmentZKMetadata> segmentsToBeReProcessed = + filterOutRemovedSegments(failedTaskInputSegments, completedRealtimeSegmentsZKMetadata); + + // if no segment to be reprocessed, no failure + boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty(); + + List<List<String>> segmentNamesGroupList = new ArrayList<>(); + Map<String, String> segmentNameVsDownloadURL = new HashMap<>(); + + // maxNumRecordsPerTask is used to divide a minion tasks among + // multiple subtasks to improve performance. + int maxNumRecordsPerTask = + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null + ? Integer.parseInt( + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)) + : DEFAULT_MAX_NUM_RECORDS_PER_TASK; + + List<SegmentZKMetadata> segmentsToBeScheduled; + + if (!prevMinionTaskSuccessful) { + segmentsToBeScheduled = segmentsToBeReProcessed; + } else { + // if all offline segments of prev minion tasks were successfully uploaded, + // we can clear the state of prev minion tasks as now it's useless. + if (!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(). + isEmpty()) { + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear(); + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear(); + // windowEndTime of prev minion task needs to be re-used for picking up the + // next windowStartTime. This is useful for case where user changes minion config + // after a minion task run was complete. So windowStartTime cannot be watermark + bucketMs + windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); } + long windowEndMs = windowStartMs + bucketMs; + // since window changed, pick new segments. + segmentsToBeScheduled = + generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, windowStartMs, windowEndMs, bucketMs, + bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, realtimeToOfflineSegmentsTaskMetadata); + } - for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { - String segmentName = segmentZKMetadata.getSegmentName(); - long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs(); - long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs(); - - // Check overlap with window - if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) { - // If last completed segment is being used, make sure that segment crosses over end of window. - // In the absence of this check, CONSUMING segments could contain some portion of the window. That data - // would be skipped forever. - if (lastLLCSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) { - LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task " - + "generation: {}", segmentName, taskType); - skipGenerate = true; - break; - } - segmentNames.add(segmentName); - downloadURLs.add(segmentZKMetadata.getDownloadUrl()); + divideSegmentsAmongSubtasks(segmentsToBeScheduled, segmentNamesGroupList, segmentNameVsDownloadURL, + maxNumRecordsPerTask); + + if (segmentNamesGroupList.isEmpty()) { + continue; + } + + List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>(); + long newWindowStartTime = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); + long newWindowEndTime = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); + + LOGGER.info( + "generating tasks for: {} with window start time: {}, window end time: {}, table: {}", taskType, + windowStartMs, + newWindowEndTime, realtimeTableName); + + for (List<String> segmentNameList : segmentNamesGroupList) { + List<String> downloadURLList = getDownloadURLList(segmentNameList, segmentNameVsDownloadURL); + Preconditions.checkState(segmentNameList.size() == downloadURLList.size()); + pinotTaskConfigsForTable.add( + createPinotTaskConfig(segmentNameList, downloadURLList, realtimeTableName, taskConfigs, tableConfig, + newWindowStartTime, + newWindowEndTime, taskType)); + } + try { + _clusterInfoAccessor + .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata, + MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + expectedVersion); + } catch (ZkBadVersionException e) { + LOGGER.error( + "Version changed while updating RTO task metadata for table: {}, skip scheduling. There are " + + "multiple task schedulers for the same table, need to investigate!", realtimeTableName); + // skip this table for this minion run + continue; + } + + pinotTaskConfigs.addAll(pinotTaskConfigsForTable); + + LOGGER.info("Finished generating task configs for table: {} for task: {}", realtimeTableName, taskType); + } + return pinotTaskConfigs; + } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, + "RealtimeToOfflineTask doesn't support upsert table!"); + // check no malformed period + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s")); + // check mergeType is correct + Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name()) + .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name()) + .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); + + Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig); + // check no mis-configured columns + Set<String> columnNames = schema.getColumnNames(); + for (Map.Entry<String, String> entry : taskConfigs.entrySet()) { + if (entry.getKey().endsWith(".aggregationType")) { + Preconditions.checkState(columnNames.contains( + StringUtils.removeEnd(entry.getKey(), RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)), + String.format("Column \"%s\" not found in schema!", entry.getKey())); + try { + // check that it's a valid aggregation function type + AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue()); + // check that a value aggregator is available + if (!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) { + throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString()); } + } catch (IllegalArgumentException e) { + String err = + String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue()); + throw new IllegalStateException(err); } - if (skipGenerate || !segmentNames.isEmpty()) { - break; - } + } + } + } - LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket", - taskType, windowStartMs, windowEndMs); - windowStartMs = windowEndMs; - windowEndMs += bucketMs; + private List<String> getDownloadURLList(List<String> segmentNameList, Map<String, String> segmentNameVsDownloadURL) { + List<String> downloadURLList = new ArrayList<>(); + for (String segmentName : segmentNameList) { + downloadURLList.add(segmentNameVsDownloadURL.get(segmentName)); + } + return downloadURLList; + } + + private void deleteInvalidOfflineSegments(String offlineTableName, + Set<String> realtimeSegmentsToBeReProcessed, + Set<String> existingOfflineTableSegmentNames, + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata) { + + Map<String, String> segmentNameToExpectedSubtaskResultID = + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(); + Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap = + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap(); + + Set<String> segmentsToBeDeleted = new HashSet<>(); + + for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) { + String id = segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName); + Preconditions.checkNotNull(id); + ExpectedSubtaskResult expectedSubtaskResult = + expectedSubtaskResultMap.get(id); + // if already marked as failure, no need to delete again. + if (expectedSubtaskResult.isTaskFailure()) { Review Comment: Do we need this state ? Can we not use offline table as source of truth of whether a segment exists or not. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -147,97 +157,385 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod); long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod); - // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = - // windowStart + bucket. - long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs); - long windowEndMs = windowStartMs + bucketMs; + ZNRecord realtimeToOfflineZNRecord = + _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + realtimeTableName); + int expectedVersion = realtimeToOfflineZNRecord != null ? realtimeToOfflineZNRecord.getVersion() : -1; + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + getRTOTaskMetadata(realtimeTableName, completedRealtimeSegmentsZKMetadata, bucketMs, + realtimeToOfflineZNRecord); + + // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. + long windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd // (exclusive) - List<String> segmentNames = new ArrayList<>(); - List<String> downloadURLs = new ArrayList<>(); Set<String> lastLLCSegmentPerPartition = new HashSet<>(partitionToLatestLLCSegmentName.values()); - boolean skipGenerate = false; - while (true) { - // Check that execution window is older than bufferTime - if (windowEndMs > System.currentTimeMillis() - bufferMs) { - LOGGER.info( - "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task " - + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType); - skipGenerate = true; - break; + + // Get all offline table segments. + // These are used to validate if previous minion task was successful or not + String offlineTableName = + TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName)); + Set<String> existingOfflineTableSegmentNames = + new HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName, true)); + + // In-case of previous minion task failures, get info + // of failed minion subtasks. They need to be reprocessed. + Set<String> failedTaskInputSegments = + getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, existingOfflineTableSegmentNames); + + // In-case of partial failure of segments upload in prev minion task run, + // data is inconsistent, delete the corresponding offline segments immediately. + if (!failedTaskInputSegments.isEmpty()) { + deleteInvalidOfflineSegments(offlineTableName, failedTaskInputSegments, existingOfflineTableSegmentNames, + realtimeToOfflineSegmentsTaskMetadata); + } + + List<SegmentZKMetadata> segmentsToBeReProcessed = + filterOutRemovedSegments(failedTaskInputSegments, completedRealtimeSegmentsZKMetadata); + + // if no segment to be reprocessed, no failure + boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty(); + + List<List<String>> segmentNamesGroupList = new ArrayList<>(); + Map<String, String> segmentNameVsDownloadURL = new HashMap<>(); + + // maxNumRecordsPerTask is used to divide a minion tasks among + // multiple subtasks to improve performance. + int maxNumRecordsPerTask = + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null + ? Integer.parseInt( + taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)) + : DEFAULT_MAX_NUM_RECORDS_PER_TASK; + + List<SegmentZKMetadata> segmentsToBeScheduled; + + if (!prevMinionTaskSuccessful) { + segmentsToBeScheduled = segmentsToBeReProcessed; + } else { + // if all offline segments of prev minion tasks were successfully uploaded, + // we can clear the state of prev minion tasks as now it's useless. + if (!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(). + isEmpty()) { + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear(); + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear(); + // windowEndTime of prev minion task needs to be re-used for picking up the + // next windowStartTime. This is useful for case where user changes minion config + // after a minion task run was complete. So windowStartTime cannot be watermark + bucketMs + windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); } + long windowEndMs = windowStartMs + bucketMs; + // since window changed, pick new segments. + segmentsToBeScheduled = + generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, windowStartMs, windowEndMs, bucketMs, + bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, realtimeToOfflineSegmentsTaskMetadata); + } - for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { - String segmentName = segmentZKMetadata.getSegmentName(); - long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs(); - long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs(); - - // Check overlap with window - if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) { - // If last completed segment is being used, make sure that segment crosses over end of window. - // In the absence of this check, CONSUMING segments could contain some portion of the window. That data - // would be skipped forever. - if (lastLLCSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) { - LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task " - + "generation: {}", segmentName, taskType); - skipGenerate = true; - break; - } - segmentNames.add(segmentName); - downloadURLs.add(segmentZKMetadata.getDownloadUrl()); + divideSegmentsAmongSubtasks(segmentsToBeScheduled, segmentNamesGroupList, segmentNameVsDownloadURL, + maxNumRecordsPerTask); + + if (segmentNamesGroupList.isEmpty()) { + continue; + } + + List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>(); + long newWindowStartTime = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(); + long newWindowEndTime = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(); + + LOGGER.info( + "generating tasks for: {} with window start time: {}, window end time: {}, table: {}", taskType, + windowStartMs, + newWindowEndTime, realtimeTableName); + + for (List<String> segmentNameList : segmentNamesGroupList) { + List<String> downloadURLList = getDownloadURLList(segmentNameList, segmentNameVsDownloadURL); + Preconditions.checkState(segmentNameList.size() == downloadURLList.size()); + pinotTaskConfigsForTable.add( + createPinotTaskConfig(segmentNameList, downloadURLList, realtimeTableName, taskConfigs, tableConfig, + newWindowStartTime, + newWindowEndTime, taskType)); + } + try { + _clusterInfoAccessor + .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata, + MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + expectedVersion); + } catch (ZkBadVersionException e) { + LOGGER.error( + "Version changed while updating RTO task metadata for table: {}, skip scheduling. There are " + + "multiple task schedulers for the same table, need to investigate!", realtimeTableName); + // skip this table for this minion run + continue; + } + + pinotTaskConfigs.addAll(pinotTaskConfigsForTable); + + LOGGER.info("Finished generating task configs for table: {} for task: {}", realtimeTableName, taskType); + } + return pinotTaskConfigs; + } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, + "RealtimeToOfflineTask doesn't support upsert table!"); + // check no malformed period + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s")); + // check mergeType is correct + Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name()) + .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name()) + .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); + + Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig); + // check no mis-configured columns + Set<String> columnNames = schema.getColumnNames(); + for (Map.Entry<String, String> entry : taskConfigs.entrySet()) { + if (entry.getKey().endsWith(".aggregationType")) { + Preconditions.checkState(columnNames.contains( + StringUtils.removeEnd(entry.getKey(), RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)), + String.format("Column \"%s\" not found in schema!", entry.getKey())); + try { + // check that it's a valid aggregation function type + AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue()); + // check that a value aggregator is available + if (!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) { + throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString()); } + } catch (IllegalArgumentException e) { + String err = + String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue()); + throw new IllegalStateException(err); } - if (skipGenerate || !segmentNames.isEmpty()) { - break; - } + } + } + } - LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket", - taskType, windowStartMs, windowEndMs); - windowStartMs = windowEndMs; - windowEndMs += bucketMs; + private List<String> getDownloadURLList(List<String> segmentNameList, Map<String, String> segmentNameVsDownloadURL) { + List<String> downloadURLList = new ArrayList<>(); + for (String segmentName : segmentNameList) { + downloadURLList.add(segmentNameVsDownloadURL.get(segmentName)); + } + return downloadURLList; + } + + private void deleteInvalidOfflineSegments(String offlineTableName, + Set<String> realtimeSegmentsToBeReProcessed, + Set<String> existingOfflineTableSegmentNames, + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata) { + + Map<String, String> segmentNameToExpectedSubtaskResultID = + realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID(); + Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap = + realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap(); + + Set<String> segmentsToBeDeleted = new HashSet<>(); + + for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) { + String id = segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName); + Preconditions.checkNotNull(id); + ExpectedSubtaskResult expectedSubtaskResult = + expectedSubtaskResultMap.get(id); + // if already marked as failure, no need to delete again. + if (expectedSubtaskResult.isTaskFailure()) { + continue; } + List<String> expectedCorrespondingOfflineSegments = expectedSubtaskResult.getSegmentsTo(); + segmentsToBeDeleted.addAll( + getSegmentsToDelete(expectedCorrespondingOfflineSegments, existingOfflineTableSegmentNames)); + // The expectedRealtimeToOfflineTaskResult is confirmed to be + // related to a failed task. Mark it as a failure, since executor will + // then only replace expectedRealtimeToOfflineTaskResult for the + // segments to be reprocessed. + expectedSubtaskResult.setTaskFailure(); + } + + if (!segmentsToBeDeleted.isEmpty()) { Review Comment: It can fail here as well, I assume, but we would not re-issue deletes for already deleted segments. Can you confirm that the actual deep store deletion happens async and only metadata (Zk, ideal state) gets removed. This is to make sure that this step does not hog the controller. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -287,77 +585,69 @@ private void getCompletedSegmentsInfo(String realtimeTableName, List<SegmentZKMe * If the znode is null, computes the watermark using either the start time config or the start time from segment * metadata */ - private long getWatermarkMs(String realtimeTableName, List<SegmentZKMetadata> completedSegmentsZKMetadata, - long bucketMs) { - ZNRecord realtimeToOfflineZNRecord = - _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, - realtimeTableName); - RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = - realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord( - realtimeToOfflineZNRecord) : null; - - if (realtimeToOfflineSegmentsTaskMetadata == null) { - // No ZNode exists. Cold-start. - long watermarkMs; - - // Find the smallest time from all segments - long minStartTimeMs = Long.MAX_VALUE; - for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { - minStartTimeMs = Math.min(minStartTimeMs, segmentZKMetadata.getStartTimeMs()); - } - Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE); + private RealtimeToOfflineSegmentsTaskMetadata getRTOTaskMetadata(String realtimeTableName, + List<SegmentZKMetadata> completedSegmentsZKMetadata, + long bucketMs, ZNRecord realtimeToOfflineZNRecord) { + + if (realtimeToOfflineZNRecord != null) { + return RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord( + realtimeToOfflineZNRecord); + } - // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries - // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window - // [20200813, 20200814) - watermarkMs = (minStartTimeMs / bucketMs) * bucketMs; + // No ZNode exists. Cold-start. + long watermarkMs; - // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above - realtimeToOfflineSegmentsTaskMetadata = new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs); - _clusterInfoAccessor.setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata, - MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, -1); + // Find the smallest time from all segments + long minStartTimeMs = Long.MAX_VALUE; + for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { + minStartTimeMs = Math.min(minStartTimeMs, segmentZKMetadata.getStartTimeMs()); } - return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(); - } + Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE); - @Override - public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { - // check table is not upsert - Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, - "RealtimeToOfflineTask doesn't support upsert table!"); - // check no malformed period - TimeUtils.convertPeriodToMillis( - taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d")); - TimeUtils.convertPeriodToMillis( - taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d")); - TimeUtils.convertPeriodToMillis( - taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s")); - // check mergeType is correct - Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name()) - .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name()) - .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); + // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries + // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window + // [20200813, 20200814) + watermarkMs = (minStartTimeMs / bucketMs) * bucketMs; - Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig); - // check no mis-configured columns - Set<String> columnNames = schema.getColumnNames(); + return new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs); + } + + private PinotTaskConfig createPinotTaskConfig(List<String> segmentNameList, List<String> downloadURLList, + String realtimeTableName, Map<String, String> taskConfigs, TableConfig tableConfig, long windowStartMs, + long windowEndMs, String taskType) { + + Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs, + _clusterInfoAccessor); + configs.putAll(getBaseTaskConfigs(tableConfig, segmentNameList)); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, StringUtils.join(downloadURLList, MinionConstants.URL_SEPARATOR)); Review Comment: Should the order of download url list match segmentNameList, so that minion correctly uses the download url ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org