Jackie-Jiang commented on a change in pull request #6124: URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r505756502
########## File path: pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java ########## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.minion; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.helix.ZNRecord; +import org.apache.pinot.spi.utils.JsonUtils; + + +/** + * Metadata for the minion task of type <code>realtimeToOfflineSegmentsTask</code>. + * The <code>watermarkMillis</code> denotes the time (exclusive) upto which tasks have been executed. + * + * This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * + * PinotTaskGenerator: + * The <code>watermarkMillis</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>, + * to determine the window of execution for the task it is generating. + * The window of execution will be [watermarkMillis, watermarkMillis + bucketSize) + * + * PinotTaskExecutor: + * The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to: + * - Verify that is is running the latest task scheduled by the task generator + * - Update the watermark as the end of the window that it executed for + */ +@JsonIgnoreProperties(ignoreUnknown = true) Review comment: This class does not need to be json compatible, as we always read/write it through `ZNRecord` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java ########## @@ -93,6 +97,38 @@ public Schema getTableSchema(String tableName) { .getRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(), tableName); } + /** + * Get all segment metadata for the given lowlevel REALTIME table name. + * + * @param tableName Table name with or without REALTIME type suffix + * @return List of segment metadata + */ + public List<LLCRealtimeSegmentZKMetadata> getLLCRealtimeSegmentsMetadata(String tableName) { + return ZKMetadataProvider + .getLLCRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(), tableName); + } + + /** + * Fetches the {@link RealtimeToOfflineSegmentsTaskMetadata} from MINION_TASK_METADATA for given realtime table + * @param tableNameWithType realtime table name + */ + public RealtimeToOfflineSegmentsTaskMetadata getMinionRealtimeToOfflineSegmentsTaskMetadata( + String tableNameWithType) { + return MinionTaskMetadataUtils + .getRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), + MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, tableNameWithType); + } + + /** + * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into MINION_TASK_METADATA + * This call will override any previous metadata node + */ + public void setRealtimeToOfflineSegmentsTaskMetadata( Review comment: Let's not put writing method in this class. This class should only contain methods of reading metadata ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java ########## @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; +import org.apache.pinot.common.utils.CommonConstants.Segment; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask} + * + * These will be generated only for REALTIME tables. + * At any given time, only 1 task of this type should be generated for a table. + * + * Steps: + * - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * In case of cold-start, no ZNode will exist. + * A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config) + * - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis, + * where bucketTime can be provided in the taskConfigs (default 1d) + * - If the execution window is not older than bufferTimeMillis, no task will be generated, + * where bufferTime can be provided in the taskConfigs (default 2d) + * - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis) + * - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task + */ +public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class); + + private static final String DEFAULT_BUCKET_PERIOD = "1d"; + private static final String DEFAULT_BUFFER_PERIOD = "2d"; + + private final ClusterInfoProvider _clusterInfoProvider; + + public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) { + _clusterInfoProvider = clusterInfoProvider; + } + + @Override + public String getTaskType() { + return RealtimeToOfflineSegmentsTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + for (TableConfig tableConfig : tableConfigs) { + String tableName = tableConfig.getTableName(); + + if (tableConfig.getTableType() != TableType.REALTIME) { + LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName); + continue; + } Review comment: Also skip HLC table here? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java ########## @@ -101,14 +101,14 @@ public void map() while (segmentRecordReader.hasNext()) { reusableRow = segmentRecordReader.next(reusableRow); - // Record transformation - reusableRow = _recordTransformer.transformRecord(reusableRow); - // Record filtering if (_recordFilter.filter(reusableRow)) { continue; } + // Record transformation Review comment: Do we do filtering before the transformation? Can filter be applied to the transformed columns? ########## File path: pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java ########## @@ -275,7 +275,7 @@ public void segmentMapperTest(String mapperId, SegmentMapperConfig segmentMapper SegmentMapperConfig config11 = new SegmentMapperConfig(_pinotSchema, new RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(), new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION) - .setFilterFunction("Groovy({timeValue != 1597795200000}, timeValue)").build(), Lists.newArrayList( + .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue >= 1597881600000}, timeValue)").build(), Lists.newArrayList( Review comment: Is this related to this PR? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java ########## @@ -62,16 +62,35 @@ private MinionConstants() { public static final String MERGED_SEGMENT_NAME_KEY = "mergedSegmentNameKey"; } + /** + * Creates segments for the OFFLINE table, using completed segments from the corresponding REALTIME table + */ public static class RealtimeToOfflineSegmentsTask { public static final String TASK_TYPE = "realtimeToOfflineSegmentsTask"; - // window + + /** + * The time window size for the task. + * e.g. if set to "1d", then task is scheduled to run for a 1 day window + */ + public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod"; + /** + * The time period to wait before picking segments for this task + * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days + */ + public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod"; + /** + * Config to manually provide start time for the very first task scheduled. + * In the absence of this config, the very first window start is calculated as min(start time of all completed segments) + */ + public static final String START_TIME_MILLIS_KEY = "startTimeMillis"; Review comment: Do we need this config? This config can cause inconsistent query result when pushing the first offline segments, where the old real-time records that are not merged will be skipped. Is there any use case we want to configure this value? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java ########## @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; +import org.apache.pinot.common.utils.CommonConstants.Segment; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask} + * + * These will be generated only for REALTIME tables. + * At any given time, only 1 task of this type should be generated for a table. + * + * Steps: + * - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * In case of cold-start, no ZNode will exist. + * A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config) + * - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis, + * where bucketTime can be provided in the taskConfigs (default 1d) + * - If the execution window is not older than bufferTimeMillis, no task will be generated, + * where bufferTime can be provided in the taskConfigs (default 2d) + * - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis) + * - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task + */ +public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class); + + private static final String DEFAULT_BUCKET_PERIOD = "1d"; + private static final String DEFAULT_BUFFER_PERIOD = "2d"; + + private final ClusterInfoProvider _clusterInfoProvider; + + public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) { + _clusterInfoProvider = clusterInfoProvider; + } + + @Override + public String getTaskType() { + return RealtimeToOfflineSegmentsTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + for (TableConfig tableConfig : tableConfigs) { + String tableName = tableConfig.getTableName(); + + if (tableConfig.getTableType() != TableType.REALTIME) { + LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName); + continue; + } + LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType); + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + + // Only schedule 1 task of this type, per table + Map<String, TaskState> nonCompletedTasks = + TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider); + if (!nonCompletedTasks.isEmpty()) { + LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping task generation.", + nonCompletedTasks.keySet(), realtimeTableName); + continue; + } + + // Get all completed segment metadata. + List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<>(); + Map<Integer, String> partitionToLatestCompletedSegmentName = new HashMap<>(); + Set<Integer> allPartitions = new HashSet<>(); + getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, partitionToLatestCompletedSegmentName, + allPartitions); + if (completedSegmentsMetadata.isEmpty()) { + LOGGER + .info("No realtime completed segments found for table: {}, skipping task generation: {}", realtimeTableName, + taskType); + continue; + } + allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet()); + if (!allPartitions.isEmpty()) { + LOGGER + .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.", + allPartitions, realtimeTableName, taskType); + continue; + } + + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + Preconditions.checkState(tableTaskConfig != null); + Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType); + Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for table: {}", tableName); + + // Get the bucket size and buffer + String bucketTimeStr = + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD); + String bufferTimeStr = + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD); + String startTimeStr = taskConfigs.get(RealtimeToOfflineSegmentsTask.START_TIME_MILLIS_KEY); + long bucketMillis = TimeUtils.convertPeriodToMillis(bucketTimeStr); + long bufferMillis = TimeUtils.convertPeriodToMillis(bufferTimeStr); + + // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = windowStart + bucket. + long windowStartMillis = + getWatermarkMillis(realtimeTableName, completedSegmentsMetadata, startTimeStr, bucketMillis); + long windowEndMillis = windowStartMillis + bucketMillis; + + // Check that execution window is older than bufferTime + if (windowEndMillis > System.currentTimeMillis() - bufferMillis) { + LOGGER.info( + "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task generation: {}", + windowStartMillis, windowEndMillis, bufferMillis, bufferTimeStr, taskType); + continue; + } + + // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd (exclusive) + List<String> segmentNames = new ArrayList<>(); + List<String> downloadURLs = new ArrayList<>(); + Set<String> lastCompletedSegmentPerPartition = new HashSet<>(partitionToLatestCompletedSegmentName.values()); + boolean skipGenerate = false; + for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) { + String segmentName = realtimeSegmentZKMetadata.getSegmentName(); + TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit(); + long segmentStartTimeMillis = timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime()); + long segmentEndTimeMillis = timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime()); + + // Check overlap with window + if (windowStartMillis <= segmentEndTimeMillis && segmentStartTimeMillis < windowEndMillis) { + // If last completed segment is being used, make sure that segment crosses over end of window. Review comment: +1 ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java ########## @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; +import org.apache.pinot.common.utils.CommonConstants.Segment; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask} + * + * These will be generated only for REALTIME tables. + * At any given time, only 1 task of this type should be generated for a table. + * + * Steps: + * - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * In case of cold-start, no ZNode will exist. + * A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config) + * - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis, + * where bucketTime can be provided in the taskConfigs (default 1d) + * - If the execution window is not older than bufferTimeMillis, no task will be generated, + * where bufferTime can be provided in the taskConfigs (default 2d) + * - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis) + * - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task + */ +public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class); + + private static final String DEFAULT_BUCKET_PERIOD = "1d"; + private static final String DEFAULT_BUFFER_PERIOD = "2d"; + + private final ClusterInfoProvider _clusterInfoProvider; + + public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) { + _clusterInfoProvider = clusterInfoProvider; + } + + @Override + public String getTaskType() { + return RealtimeToOfflineSegmentsTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE; + + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + for (TableConfig tableConfig : tableConfigs) { + String tableName = tableConfig.getTableName(); + + if (tableConfig.getTableType() != TableType.REALTIME) { + LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName); + continue; + } + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + + // Only schedule 1 task of this type, per table + Map<String, TaskState> nonCompletedTasks = + TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider); + if (!nonCompletedTasks.isEmpty()) { + LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping scheduling new task.", + nonCompletedTasks.keySet(), realtimeTableName); Review comment: IMO it is fine to log one warning per table. It is quite critical issue if minion stuck for this job (data loss if it doesn't move the segments on time), so we should detect it as soon as possible ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java ########## @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; +import org.apache.pinot.common.utils.CommonConstants.Segment; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask} + * + * These will be generated only for REALTIME tables. + * At any given time, only 1 task of this type should be generated for a table. + * + * Steps: + * - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * In case of cold-start, no ZNode will exist. + * A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config) + * - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis, + * where bucketTime can be provided in the taskConfigs (default 1d) + * - If the execution window is not older than bufferTimeMillis, no task will be generated, + * where bufferTime can be provided in the taskConfigs (default 2d) + * - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis) + * - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task + */ +public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class); + + private static final String DEFAULT_BUCKET_PERIOD = "1d"; + private static final String DEFAULT_BUFFER_PERIOD = "2d"; + + private final ClusterInfoProvider _clusterInfoProvider; + + public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) { + _clusterInfoProvider = clusterInfoProvider; + } + + @Override + public String getTaskType() { + return RealtimeToOfflineSegmentsTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + for (TableConfig tableConfig : tableConfigs) { + String tableName = tableConfig.getTableName(); + + if (tableConfig.getTableType() != TableType.REALTIME) { + LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName); + continue; + } + LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType); + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + + // Only schedule 1 task of this type, per table + Map<String, TaskState> nonCompletedTasks = + TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider); + if (!nonCompletedTasks.isEmpty()) { + LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping task generation.", + nonCompletedTasks.keySet(), realtimeTableName); + continue; + } + + // Get all completed segment metadata. + List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<>(); + Map<Integer, String> partitionToLatestCompletedSegmentName = new HashMap<>(); + Set<Integer> allPartitions = new HashSet<>(); + getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, partitionToLatestCompletedSegmentName, + allPartitions); + if (completedSegmentsMetadata.isEmpty()) { + LOGGER + .info("No realtime completed segments found for table: {}, skipping task generation: {}", realtimeTableName, + taskType); + continue; + } + allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet()); + if (!allPartitions.isEmpty()) { + LOGGER + .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.", + allPartitions, realtimeTableName, taskType); + continue; + } + + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + Preconditions.checkState(tableTaskConfig != null); + Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType); + Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for table: {}", tableName); + + // Get the bucket size and buffer + String bucketTimeStr = + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD); + String bufferTimeStr = + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD); Review comment: ```suggestion String bucketTimePeriod = taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD); String bufferTimePeriod = taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD); ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java ########## @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; +import org.apache.pinot.common.utils.CommonConstants.Segment; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask} + * + * These will be generated only for REALTIME tables. + * At any given time, only 1 task of this type should be generated for a table. + * + * Steps: + * - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * In case of cold-start, no ZNode will exist. + * A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config) + * - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis, + * where bucketTime can be provided in the taskConfigs (default 1d) + * - If the execution window is not older than bufferTimeMillis, no task will be generated, + * where bufferTime can be provided in the taskConfigs (default 2d) + * - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis) + * - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task + */ +public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class); + + private static final String DEFAULT_BUCKET_PERIOD = "1d"; + private static final String DEFAULT_BUFFER_PERIOD = "2d"; + + private final ClusterInfoProvider _clusterInfoProvider; + + public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) { + _clusterInfoProvider = clusterInfoProvider; + } + + @Override + public String getTaskType() { + return RealtimeToOfflineSegmentsTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + for (TableConfig tableConfig : tableConfigs) { + String tableName = tableConfig.getTableName(); + + if (tableConfig.getTableType() != TableType.REALTIME) { + LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName); + continue; + } + LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType); + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); Review comment: (nit) No need to extract raw table name ```suggestion String realtimeTableName = tableConfig.getTableName(); if (tableConfig.getTableType() != TableType.REALTIME) { LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, realtimeTableName); continue; } LOGGER.info("Start generating task configs for table: {} for task: {}", realtimeTableName, taskType); ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java ########## @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; +import org.apache.pinot.common.utils.CommonConstants.Segment; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask} + * + * These will be generated only for REALTIME tables. + * At any given time, only 1 task of this type should be generated for a table. + * + * Steps: + * - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType Review comment: Reformat this part of the javadoc (keep each line less than 120 characters) ########## File path: pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java ########## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.minion; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.helix.ZNRecord; +import org.apache.pinot.spi.utils.JsonUtils; + + +/** + * Metadata for the minion task of type <code>realtimeToOfflineSegmentsTask</code>. + * The <code>watermarkMillis</code> denotes the time (exclusive) upto which tasks have been executed. + * + * This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * + * PinotTaskGenerator: + * The <code>watermarkMillis</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>, + * to determine the window of execution for the task it is generating. + * The window of execution will be [watermarkMillis, watermarkMillis + bucketSize) + * + * PinotTaskExecutor: + * The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to: + * - Verify that is is running the latest task scheduled by the task generator + * - Update the watermark as the end of the window that it executed for + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class RealtimeToOfflineSegmentsTaskMetadata { + + private static final String WATERMARK_KEY = "watermarkMillis"; + + private final String _tableNameWithType; + private long _watermarkMillis; Review comment: Should we change this to final and make the metadata immutable? ########## File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutor.java ########## @@ -26,6 +26,12 @@ */ public interface PinotTaskExecutor { + /** + * Pre processing operations to be done at the beginning of task execution + */ + default void preProcess(PinotTaskConfig pinotTaskConfig) { Review comment: I don't think we should put `preProcess()` and `postProcess()` into the interface. You can still keep them in the `RealtimeToOfflineSegmentsTaskExecutor` for better organization of the code. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java ########## @@ -68,4 +69,26 @@ } return runningSegments; } + + /** + * Gets all the tasks for the provided task type and tableName, which do not have TaskState COMPLETED + * @return map containing task name to task state for non-completed tasks + */ + public static Map<String, TaskState> getNonCompletedTasks(String taskType, String tableNameWithType, + ClusterInfoProvider clusterInfoProvider) { + + Map<String, TaskState> nonCompletedTasks = new HashMap<>(); + Map<String, TaskState> taskStates = clusterInfoProvider.getTaskStates(taskType); + for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) { + if (entry.getValue() == TaskState.COMPLETED) { + continue; + } Review comment: Also skip the old tasks? ########## File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java ########## @@ -162,11 +212,34 @@ for (File file : outputSegmentsDir.listFiles()) { String outputSegmentName = file.getName(); results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName) - .setTableNameWithType(tableNameWithType).build()); + .setTableNameWithType(offlineTableName).build()); } return results; } + /** + * Fetches the realtimeToOfflineSegmentsTask metadata ZNode for the realtime table. + * Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update watermark in the ZNode + * TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot another way to do it + */ + @Override + public void postProcess(PinotTaskConfig pinotTaskConfig) { + String realtimeTableName = pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY); + + ZNRecord realtimeToOfflineSegmentsTaskRecord = Review comment: No need to check version again, the `setRealtimeToOfflineSegmentsTaskMetadata()` can perform the version check ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java ########## @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; +import org.apache.pinot.common.utils.CommonConstants.Segment; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask} + * + * These will be generated only for REALTIME tables. + * At any given time, only 1 task of this type should be generated for a table. + * + * Steps: + * - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * In case of cold-start, no ZNode will exist. + * A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config) + * - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis, + * where bucketTime can be provided in the taskConfigs (default 1d) + * - If the execution window is not older than bufferTimeMillis, no task will be generated, + * where bufferTime can be provided in the taskConfigs (default 2d) + * - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis) + * - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task + */ +public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class); + + private static final String DEFAULT_BUCKET_PERIOD = "1d"; + private static final String DEFAULT_BUFFER_PERIOD = "2d"; + + private final ClusterInfoProvider _clusterInfoProvider; + + public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) { + _clusterInfoProvider = clusterInfoProvider; + } + + @Override + public String getTaskType() { + return RealtimeToOfflineSegmentsTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + for (TableConfig tableConfig : tableConfigs) { + String tableName = tableConfig.getTableName(); + + if (tableConfig.getTableType() != TableType.REALTIME) { + LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName); + continue; + } + LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType); + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + + // Only schedule 1 task of this type, per table + Map<String, TaskState> nonCompletedTasks = + TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider); + if (!nonCompletedTasks.isEmpty()) { + LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping task generation.", + nonCompletedTasks.keySet(), realtimeTableName); + continue; + } + + // Get all completed segment metadata. + List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<>(); + Map<Integer, String> partitionToLatestCompletedSegmentName = new HashMap<>(); + Set<Integer> allPartitions = new HashSet<>(); + getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, partitionToLatestCompletedSegmentName, + allPartitions); + if (completedSegmentsMetadata.isEmpty()) { + LOGGER + .info("No realtime completed segments found for table: {}, skipping task generation: {}", realtimeTableName, + taskType); + continue; + } + allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet()); + if (!allPartitions.isEmpty()) { + LOGGER + .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.", + allPartitions, realtimeTableName, taskType); + continue; + } + + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + Preconditions.checkState(tableTaskConfig != null); + Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType); + Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for table: {}", tableName); + + // Get the bucket size and buffer + String bucketTimeStr = + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD); + String bufferTimeStr = + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD); + String startTimeStr = taskConfigs.get(RealtimeToOfflineSegmentsTask.START_TIME_MILLIS_KEY); + long bucketMillis = TimeUtils.convertPeriodToMillis(bucketTimeStr); + long bufferMillis = TimeUtils.convertPeriodToMillis(bufferTimeStr); Review comment: Same for other `Millis` ```suggestion long bucketTimeMs = TimeUtils.convertPeriodToMillis(bucketTimeStr); long bufferTimeMs = TimeUtils.convertPeriodToMillis(bufferTimeStr); ``` ########## File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutor.java ########## @@ -26,6 +26,12 @@ */ public interface PinotTaskExecutor { + /** + * Pre processing operations to be done at the beginning of task execution + */ + default void preProcess(PinotTaskConfig pinotTaskConfig) { Review comment: > Why not make this a boolean return, so that if the pre-process fails we do not go further in the task execution? Not sure what helix does when we throw exceptions from the task executor -- probably retries the task, but that may not be good for us. I prefer throwing exception when something went wrong to force the caller to handle the exception, or the caller could just ignore the return value, which could cause unexpected behavior. Minion code can handle the exception and fail the task ########## File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java ########## @@ -54,17 +57,60 @@ * A task to convert segments from a REALTIME table to segments for its corresponding OFFLINE table. * The realtime segments could span across multiple time windows. This task extracts data and creates segments for a configured time range. * The {@link SegmentProcessorFramework} is used for the segment conversion, which also does - * 1. time column rollup - * 2. time window extraction using filter function + * 1. time window extraction using filter function + * 2. time column rollup * 3. partitioning using table config's segmentPartitioningConfig * 4. aggregations and rollup * 5. data sorting + * + * Before beginning the task, the <code>watermarkMillis</code> is checked in the minion task metadata ZNode, located at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/<tableNameWithType> + * It should match the <code>windowStartMillis</code>. + * The version of the znode is cached. + * + * After the segments are uploaded, this task updates the <code>watermarkMillis</code> in the minion task metadata ZNode. + * The znode version is checked during update, and update only succeeds if version matches with the previously cached version */ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsConversionExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class); private static final String INPUT_SEGMENTS_DIR = "input_segments"; private static final String OUTPUT_SEGMENTS_DIR = "output_segments"; + private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager; + private int _expectedVersion = -1; Review comment: Put `Integer.MIN_VALUE` as the default? We don't want to override the ZNRecord if this is not correctly set ########## File path: pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java ########## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.minion; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.helix.ZNRecord; +import org.apache.pinot.spi.utils.JsonUtils; + + +/** + * Metadata for the minion task of type <code>realtimeToOfflineSegmentsTask</code>. + * The <code>watermarkMillis</code> denotes the time (exclusive) upto which tasks have been executed. + * + * This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * + * PinotTaskGenerator: + * The <code>watermarkMillis</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>, + * to determine the window of execution for the task it is generating. + * The window of execution will be [watermarkMillis, watermarkMillis + bucketSize) + * + * PinotTaskExecutor: + * The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to: + * - Verify that is is running the latest task scheduled by the task generator + * - Update the watermark as the end of the window that it executed for + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class RealtimeToOfflineSegmentsTaskMetadata { + + private static final String WATERMARK_KEY = "watermarkMillis"; Review comment: I would suggest `watermarkMs` to be consistent with other classes ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java ########## @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; +import org.apache.pinot.common.utils.CommonConstants.Segment; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask} + * + * These will be generated only for REALTIME tables. + * At any given time, only 1 task of this type should be generated for a table. + * + * Steps: + * - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType + * In case of cold-start, no ZNode will exist. + * A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config) + * - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis, + * where bucketTime can be provided in the taskConfigs (default 1d) + * - If the execution window is not older than bufferTimeMillis, no task will be generated, + * where bufferTime can be provided in the taskConfigs (default 2d) + * - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis) + * - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task + */ +public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class); + + private static final String DEFAULT_BUCKET_PERIOD = "1d"; + private static final String DEFAULT_BUFFER_PERIOD = "2d"; + + private final ClusterInfoProvider _clusterInfoProvider; + + public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) { + _clusterInfoProvider = clusterInfoProvider; + } + + @Override + public String getTaskType() { + return RealtimeToOfflineSegmentsTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + for (TableConfig tableConfig : tableConfigs) { + String tableName = tableConfig.getTableName(); + + if (tableConfig.getTableType() != TableType.REALTIME) { + LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName); + continue; + } + LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType); + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + + // Only schedule 1 task of this type, per table + Map<String, TaskState> nonCompletedTasks = Review comment: We can keep the same logic as in `TaskGeneratorUtils.getRunningSegments()` to skip old tasks ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org