Jackie-Jiang commented on a change in pull request #6094: URL: https://github.com/apache/incubator-pinot/pull/6094#discussion_r523350741
########## File path: pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java ########## @@ -57,7 +59,7 @@ private static final File WORKING_DIR = new File(TEMP_DIR, "workingDir"); private static final int NUM_SEGMENTS = 10; private static final int NUM_ROWS = 5; - private static final String MERGED_SEGMENT_NAME = "testMergedSegment"; + private static final String SEGMENT_NAME = "testMergedSegment"; Review comment: (nit) ```suggestion private static final String SEGMENT_NAME = "testSegment"; ``` ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MinionTasksIntegrationTest.java ########## @@ -116,9 +123,6 @@ public void testConvertToRawIndexTask() // Should generate one more ConvertToRawIndexTask task with 3 child tasks Assert.assertTrue(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE)); - // Should not generate more tasks Review comment: Why removing this part? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/mergestrategy/TimeBasedMergeStrategy.java ########## @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.mergestrategy; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.operator.transform.transformer.timeunit.CustomTimeUnitTransformer; +import org.apache.pinot.spi.utils.TimeUtils; +import org.joda.time.Interval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Time based merge strategy. + * + * When the segments are correctly time aligned (e.g. each segment contains exactly 1 day's data), the time based merge + * strategy allows to merge segments based on the segment granularity. + * + * For example, the merge strategy will try to merge segments based on the daily bucket if the segment granularity is + * set to 'DAILY' (i.e. only segments from the same day can be considered to be merged). + * + * Default is 'NONE' where the strategy would try to merge everything into a single time bucket. + */ +public class TimeBasedMergeStrategy implements MergeStrategy { + + public enum SegmentGranularity { + NONE, DAYS, WEEKS, MONTHS + } + + private static final Logger LOGGER = LoggerFactory.getLogger(TimeBasedMergeStrategy.class); + + private SegmentGranularity _segmentGranularity; + + public TimeBasedMergeStrategy(Map<String, String> taskConfigs) { + String segmentGranularityStr = taskConfigs.get(MinionConstants.MergeRollupTask.SEGMENT_GRANULARITY); + try { + _segmentGranularity = SegmentGranularity.valueOf(segmentGranularityStr); + } catch (Exception e) { + _segmentGranularity = SegmentGranularity.NONE; + } + LOGGER.info("Time based merge strategy is initialized with the segment granularity : {}", _segmentGranularity); + } + + @Override + public List<List<SegmentZKMetadata>> generateMergeTaskCandidates(List<SegmentZKMetadata> segmentsToMerge, Review comment: Per the design doc, each time we only merge segments for the earliest one day/week/month, where segments that cross the time boundary are split into 2 parts. Am I missing something here? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/mergestrategy/TimeBasedMergeStrategy.java ########## @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.mergestrategy; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.operator.transform.transformer.timeunit.CustomTimeUnitTransformer; +import org.apache.pinot.spi.utils.TimeUtils; +import org.joda.time.Interval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Time based merge strategy. + * + * When the segments are correctly time aligned (e.g. each segment contains exactly 1 day's data), the time based merge + * strategy allows to merge segments based on the segment granularity. + * + * For example, the merge strategy will try to merge segments based on the daily bucket if the segment granularity is + * set to 'DAILY' (i.e. only segments from the same day can be considered to be merged). + * + * Default is 'NONE' where the strategy would try to merge everything into a single time bucket. Review comment: Should we allow `NONE`? Merging all segments into one each time we run the task generator doesn't seem correct to me ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/mergestrategy/TimeBasedMergeStrategy.java ########## @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.mergestrategy; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.operator.transform.transformer.timeunit.CustomTimeUnitTransformer; +import org.apache.pinot.spi.utils.TimeUtils; +import org.joda.time.Interval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Time based merge strategy. + * + * When the segments are correctly time aligned (e.g. each segment contains exactly 1 day's data), the time based merge + * strategy allows to merge segments based on the segment granularity. + * + * For example, the merge strategy will try to merge segments based on the daily bucket if the segment granularity is + * set to 'DAILY' (i.e. only segments from the same day can be considered to be merged). Review comment: `DAILY` or `DAYS`? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/mergestrategy/TimeBasedMergeStrategy.java ########## @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.mergestrategy; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.operator.transform.transformer.timeunit.CustomTimeUnitTransformer; +import org.apache.pinot.spi.utils.TimeUtils; +import org.joda.time.Interval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Time based merge strategy. + * + * When the segments are correctly time aligned (e.g. each segment contains exactly 1 day's data), the time based merge + * strategy allows to merge segments based on the segment granularity. + * + * For example, the merge strategy will try to merge segments based on the daily bucket if the segment granularity is + * set to 'DAILY' (i.e. only segments from the same day can be considered to be merged). + * + * Default is 'NONE' where the strategy would try to merge everything into a single time bucket. + */ +public class TimeBasedMergeStrategy implements MergeStrategy { + + public enum SegmentGranularity { + NONE, DAYS, WEEKS, MONTHS + } + + private static final Logger LOGGER = LoggerFactory.getLogger(TimeBasedMergeStrategy.class); + + private SegmentGranularity _segmentGranularity; + + public TimeBasedMergeStrategy(Map<String, String> taskConfigs) { + String segmentGranularityStr = taskConfigs.get(MinionConstants.MergeRollupTask.SEGMENT_GRANULARITY); + try { + _segmentGranularity = SegmentGranularity.valueOf(segmentGranularityStr); + } catch (Exception e) { + _segmentGranularity = SegmentGranularity.NONE; + } + LOGGER.info("Time based merge strategy is initialized with the segment granularity : {}", _segmentGranularity); + } + + @Override + public List<List<SegmentZKMetadata>> generateMergeTaskCandidates(List<SegmentZKMetadata> segmentsToMerge, + int maxNumSegmentsPerTask) { + Map<Long, List<SegmentZKMetadata>> bucketedSegments = new TreeMap<>(); + + for (SegmentZKMetadata segmentZKMetadata : segmentsToMerge) { + TimeUnit timeUnit = segmentZKMetadata.getTimeUnit(); Review comment: Use the new API introduced in #6239 ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/mergestrategy/TimeBasedMergeStrategy.java ########## @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.mergestrategy; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.operator.transform.transformer.timeunit.CustomTimeUnitTransformer; +import org.apache.pinot.spi.utils.TimeUtils; +import org.joda.time.Interval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Time based merge strategy. + * + * When the segments are correctly time aligned (e.g. each segment contains exactly 1 day's data), the time based merge + * strategy allows to merge segments based on the segment granularity. + * + * For example, the merge strategy will try to merge segments based on the daily bucket if the segment granularity is + * set to 'DAILY' (i.e. only segments from the same day can be considered to be merged). + * + * Default is 'NONE' where the strategy would try to merge everything into a single time bucket. + */ +public class TimeBasedMergeStrategy implements MergeStrategy { + + public enum SegmentGranularity { + NONE, DAYS, WEEKS, MONTHS + } + + private static final Logger LOGGER = LoggerFactory.getLogger(TimeBasedMergeStrategy.class); + + private SegmentGranularity _segmentGranularity; + + public TimeBasedMergeStrategy(Map<String, String> taskConfigs) { + String segmentGranularityStr = taskConfigs.get(MinionConstants.MergeRollupTask.SEGMENT_GRANULARITY); + try { + _segmentGranularity = SegmentGranularity.valueOf(segmentGranularityStr); + } catch (Exception e) { + _segmentGranularity = SegmentGranularity.NONE; + } + LOGGER.info("Time based merge strategy is initialized with the segment granularity : {}", _segmentGranularity); + } + + @Override + public List<List<SegmentZKMetadata>> generateMergeTaskCandidates(List<SegmentZKMetadata> segmentsToMerge, + int maxNumSegmentsPerTask) { + Map<Long, List<SegmentZKMetadata>> bucketedSegments = new TreeMap<>(); + + for (SegmentZKMetadata segmentZKMetadata : segmentsToMerge) { + TimeUnit timeUnit = segmentZKMetadata.getTimeUnit(); + long startTimeInMillis = timeUnit.toMillis(segmentZKMetadata.getStartTime()); + long endTimeInMillis = timeUnit.toMillis(segmentZKMetadata.getEndTime()); + + if (!TimeUtils.isValidTimeInterval(new Interval(startTimeInMillis, endTimeInMillis))) { + LOGGER.warn("Time interval is not valid. segmentName = {}, startTime = {}, endTime= {}", + segmentZKMetadata.getSegmentName(), startTimeInMillis, endTimeInMillis); + continue; + } + + switch (_segmentGranularity) { + case DAYS: + startTimeInMillis = TimeUnit.DAYS.convert(startTimeInMillis, TimeUnit.MILLISECONDS); + endTimeInMillis = TimeUnit.DAYS.convert(endTimeInMillis, TimeUnit.MILLISECONDS); + break; + case WEEKS: + CustomTimeUnitTransformer weeksTimeUnitTransformer = + new CustomTimeUnitTransformer(TimeUnit.MILLISECONDS, "WEEKS"); + long[] weeksInput = new long[]{startTimeInMillis, endTimeInMillis}; + long[] weeksOutput = new long[2]; + weeksTimeUnitTransformer.transform(weeksInput, weeksOutput, weeksInput.length); + startTimeInMillis = weeksOutput[0]; + endTimeInMillis = weeksOutput[1]; + break; + case MONTHS: + CustomTimeUnitTransformer monthsTimeUnitTransformer = + new CustomTimeUnitTransformer(TimeUnit.MILLISECONDS, "MONTHS"); + long[] monthsInput = new long[]{startTimeInMillis, endTimeInMillis}; + long[] monthsOutput = new long[2]; + monthsTimeUnitTransformer.transform(monthsInput, monthsOutput, monthsInput.length); + startTimeInMillis = monthsOutput[0]; + endTimeInMillis = monthsOutput[1]; + break; + default: + startTimeInMillis = 0; + endTimeInMillis = 0; + break; + } + + if (startTimeInMillis != endTimeInMillis) { + LOGGER.warn( Review comment: This is normal case, and we should not log warning ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org