snleee commented on a change in pull request #7283: URL: https://github.com/apache/pinot/pull/7283#discussion_r688188698
########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java ########## @@ -0,0 +1,565 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.commons.io.FileUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.common.minion.MergeRollupTaskMetadata; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.compat.tests.SqlResultComparator; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Integration test for minion task of type "MergeRollupTask" + */ +public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrationTest { + private static final String SINGLE_LEVEL_CONCAT_TEST_TABLE = "myTable1"; + private static final String SINGLE_LEVEL_ROLLUP_TEST_TABLE = "myTable2"; + private static final String MULTI_LEVEL_CONCAT_TEST_TABLE = "myTable3"; + + protected PinotHelixTaskResourceManager _helixTaskResourceManager; + protected PinotTaskManager _taskManager; + protected PinotHelixResourceManager _pinotHelixResourceManager; + + protected final File _segmentDir1 = new File(_tempDir, "segmentDir1"); + protected final File _segmentDir2 = new File(_tempDir, "segmentDir2"); + protected final File _segmentDir3 = new File(_tempDir, "segmentDir3"); + protected final File _tarDir1 = new File(_tempDir, "tarDir1"); + protected final File _tarDir2 = new File(_tempDir, "tarDir2"); + protected final File _tarDir3 = new File(_tempDir, "tarDir3"); + + @BeforeClass + public void setUp() throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _tarDir1, _tarDir2, + _tarDir3); + + // Start the Pinot cluster + startZk(); + startController(); + startBrokers(1); + startServers(1); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig singleLevelConcatTableConfig = + createOfflineTableConfig(SINGLE_LEVEL_CONCAT_TEST_TABLE, getSingleLevelConcatTaskConfig()); + TableConfig singleLevelRollupTableConfig = + createOfflineTableConfig(SINGLE_LEVEL_ROLLUP_TEST_TABLE, getSingleLevelRollupTaskConfig()); + TableConfig multiLevelConcatTableConfig = + createOfflineTableConfig(MULTI_LEVEL_CONCAT_TEST_TABLE, getMultiLevelConcatTaskConfig()); + addTableConfig(singleLevelConcatTableConfig); + addTableConfig(singleLevelRollupTableConfig); + addTableConfig(multiLevelConcatTableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1, + _tarDir1); + buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "1"); + buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "2"); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, + _tarDir3); + uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1); + uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2); + uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3); + + // Set up the H2 connection + setUpH2Connection(avroFiles); + + // Initialize the query generator + setUpQueryGenerator(avroFiles); + + startMinion(); + _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); + _taskManager = _controllerStarter.getTaskManager(); + _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager(); + } + + private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig) { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName) + .setSchemaName(getSchemaName()) + .setTimeColumnName(getTimeColumnName()) + .setSortedColumn(getSortedColumn()) + .setInvertedIndexColumns(getInvertedIndexColumns()) + .setNoDictionaryColumns(getNoDictionaryColumns()) + .setRangeIndexColumns(getRangeIndexColumns()) + .setBloomFilterColumns(getBloomFilterColumns()) + .setFieldConfigList(getFieldConfigs()) + .setNumReplicas(getNumReplicas()) + .setSegmentVersion(getSegmentVersion()) + .setLoadMode(getLoadMode()) + .setTaskConfig(taskConfig) + .setBrokerTenant(getBrokerTenant()) + .setServerTenant(getServerTenant()) + .setIngestionConfig(getIngestionConfig()) + .setNullHandlingEnabled(getNullHandlingEnabled()) + .build(); + } + + private TableTaskConfig getSingleLevelConcatTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put("100days.mergeType", "concat"); + tableTaskConfigs.put("100days.bufferTimePeriod", "1d"); + tableTaskConfigs.put("100days.bucketTimePeriod", "100d"); + tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000"); + tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000"); + tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min"); + tableTaskConfigs.put("WeatherDelay.aggregationType", "min"); Review comment: Can we test the 2nd aggregation type? Maybe we can use `sum` ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java ########## @@ -0,0 +1,565 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.commons.io.FileUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.common.minion.MergeRollupTaskMetadata; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.compat.tests.SqlResultComparator; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Integration test for minion task of type "MergeRollupTask" + */ +public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrationTest { + private static final String SINGLE_LEVEL_CONCAT_TEST_TABLE = "myTable1"; + private static final String SINGLE_LEVEL_ROLLUP_TEST_TABLE = "myTable2"; + private static final String MULTI_LEVEL_CONCAT_TEST_TABLE = "myTable3"; + + protected PinotHelixTaskResourceManager _helixTaskResourceManager; + protected PinotTaskManager _taskManager; + protected PinotHelixResourceManager _pinotHelixResourceManager; + + protected final File _segmentDir1 = new File(_tempDir, "segmentDir1"); + protected final File _segmentDir2 = new File(_tempDir, "segmentDir2"); + protected final File _segmentDir3 = new File(_tempDir, "segmentDir3"); + protected final File _tarDir1 = new File(_tempDir, "tarDir1"); + protected final File _tarDir2 = new File(_tempDir, "tarDir2"); + protected final File _tarDir3 = new File(_tempDir, "tarDir3"); + + @BeforeClass + public void setUp() throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _tarDir1, _tarDir2, + _tarDir3); + + // Start the Pinot cluster + startZk(); + startController(); + startBrokers(1); + startServers(1); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig singleLevelConcatTableConfig = + createOfflineTableConfig(SINGLE_LEVEL_CONCAT_TEST_TABLE, getSingleLevelConcatTaskConfig()); + TableConfig singleLevelRollupTableConfig = + createOfflineTableConfig(SINGLE_LEVEL_ROLLUP_TEST_TABLE, getSingleLevelRollupTaskConfig()); + TableConfig multiLevelConcatTableConfig = + createOfflineTableConfig(MULTI_LEVEL_CONCAT_TEST_TABLE, getMultiLevelConcatTaskConfig()); + addTableConfig(singleLevelConcatTableConfig); + addTableConfig(singleLevelRollupTableConfig); + addTableConfig(multiLevelConcatTableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1, + _tarDir1); + buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "1"); + buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "2"); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, + _tarDir3); + uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1); + uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2); + uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3); + + // Set up the H2 connection + setUpH2Connection(avroFiles); + + // Initialize the query generator + setUpQueryGenerator(avroFiles); + + startMinion(); + _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); + _taskManager = _controllerStarter.getTaskManager(); + _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager(); + } + + private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig) { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName) + .setSchemaName(getSchemaName()) + .setTimeColumnName(getTimeColumnName()) + .setSortedColumn(getSortedColumn()) + .setInvertedIndexColumns(getInvertedIndexColumns()) + .setNoDictionaryColumns(getNoDictionaryColumns()) + .setRangeIndexColumns(getRangeIndexColumns()) + .setBloomFilterColumns(getBloomFilterColumns()) + .setFieldConfigList(getFieldConfigs()) + .setNumReplicas(getNumReplicas()) + .setSegmentVersion(getSegmentVersion()) + .setLoadMode(getLoadMode()) + .setTaskConfig(taskConfig) + .setBrokerTenant(getBrokerTenant()) + .setServerTenant(getServerTenant()) + .setIngestionConfig(getIngestionConfig()) + .setNullHandlingEnabled(getNullHandlingEnabled()) + .build(); + } + + private TableTaskConfig getSingleLevelConcatTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put("100days.mergeType", "concat"); + tableTaskConfigs.put("100days.bufferTimePeriod", "1d"); + tableTaskConfigs.put("100days.bucketTimePeriod", "100d"); + tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000"); + tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000"); + tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min"); + tableTaskConfigs.put("WeatherDelay.aggregationType", "min"); + return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); + } + + private TableTaskConfig getSingleLevelRollupTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put("150days.mergeType", "rollup"); + tableTaskConfigs.put("150days.bufferTimePeriod", "1d"); + tableTaskConfigs.put("150days.bucketTimePeriod", "150d"); + tableTaskConfigs.put("150days.roundBucketTimePeriod", "7d"); + return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); + } + + private TableTaskConfig getMultiLevelConcatTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put("45days.mergeType", "concat"); + tableTaskConfigs.put("45days.bufferTimePeriod", "1d"); + tableTaskConfigs.put("45days.bucketTimePeriod", "45d"); + tableTaskConfigs.put("45days.maxNumRecordsPerSegment", "100000"); + tableTaskConfigs.put("45days.maxNumRecordsPerTask", "100000"); + + tableTaskConfigs.put("90days.mergeType", "concat"); + tableTaskConfigs.put("90days.bufferTimePeriod", "1d"); + tableTaskConfigs.put("90days.bucketTimePeriod", "90d"); + tableTaskConfigs.put("90days.maxNumRecordsPerSegment", "100000"); + tableTaskConfigs.put("90days.maxNumRecordsPerTask", "100000"); + return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); + } + + private static void buildSegmentsFromAvroWithPostfix(List<File> avroFiles, TableConfig tableConfig, + org.apache.pinot.spi.data.Schema schema, int baseSegmentIndex, File segmentDir, File tarDir, String postfix) + throws Exception { + int numAvroFiles = avroFiles.size(); + ExecutorService executorService = Executors.newFixedThreadPool(numAvroFiles); + List<Future<Void>> futures = new ArrayList<>(numAvroFiles); + for (int i = 0; i < numAvroFiles; i++) { + File avroFile = avroFiles.get(i); + int segmentIndex = i + baseSegmentIndex; + futures.add(executorService.submit(() -> { + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); + segmentGeneratorConfig.setInputFilePath(avroFile.getPath()); + segmentGeneratorConfig.setOutDir(segmentDir.getPath()); + segmentGeneratorConfig.setTableName(tableConfig.getTableName()); + // Test segment with space and special character in the file name + segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + "_" + postfix); + + // Build the segment + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig); + driver.build(); + + // Tar the segment + String segmentName = driver.getSegmentName(); + File indexDir = new File(segmentDir, segmentName); + File segmentTarFile = new File(tarDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile); + return null; + })); + } + executorService.shutdown(); + for (Future<Void> future : futures) { + future.get(); + } + } + + /** + * Test single level concat task with maxNumRecordPerTask, maxNumRecordPerSegment constraints + */ + @Test + public void testSingleLevelConcat() throws Exception { + // The original segments are time partitioned by month: + // segmentName (totalDocs) + // myTable1_16071_16101_3 (9746) + // myTable1_16102_16129_4 (8690) + // myTable1_16130_16159_5 (9621) + // myTable1_16160_16189_6 (9454) + // myTable1_16190_16220_7 (10329) + // myTable1_16221_16250_8 (10468) + // myTable1_16251_16281_9 (10499) + // myTable1_16282_16312_10 (10196) + // myTable1_16313_16342_11 (9136) + // myTable1_16343_16373_0 (9292) + // myTable1_16374_16404_1 (8736) + // myTable1_16405_16435_2 (9378) + + // Expected merge tasks and result segments: + // 1. + // {myTable1_16071_16101_3} + // -> {merged_100days_T1_0_myTable1_16071_16099_0, merged_100days_T1_0_myTable1_16100_16101_1} + // 2. + // {merged_100days_T1_0_myTable1_16100_16101_1, myTable1_16102_16129_4, myTable1_16130_16159_5} + // -> {merged_100days_T2_0_myTable1_16100_???_0(15000), merged_100days_T2_0_myTable1_???_16159_1} + // {myTable1_16160_16189_6, myTable1_16190_16220_7} + // -> {merged_100days_T2_1_myTable1_16160_16199_0, merged_100days_T2_1_myTable1_16200_16220_1} + // 3. + // {merged_100days_T2_1_myTable1_16200_16220_1, myTable1_16221_16250_8} + // -> {merged_100days_T3_0_myTable1_16200_???_0(15000), merged_100days_T3_0_myTable1_???_16250_1} + // {myTable1_16251_16281_9, myTable1_16282_16312_10} + // -> {merged_100days_T3_1_myTable1_16251_???_0(15000), merged_100days_T3_1_myTable1_???_16299_1, merged_100days_T3_1_myTable1_16300_16312_2} + // 4. + // {merged_100days_T3_1_myTable1_16300_16312_2, myTable1_16313_16342_11, myTable1_16343_16373_0} + // -> {merged_100days_T4_0_myTable1_16300_???_0(15000), merged_100days_T4_0_myTable1_???_16373_1} + // {myTable1_16374_16404_1} + // -> {merged_100days_T4_1_16374_16399_0, merged_100days_T4_1_16400_16404_1} + // 5. + // {merged_100days_T4_1_16400_16404_1, myTable1_16405_16435_2} + // -> {merged_100days_T5_0_myTable1_16400_16435_0} + + String sqlQuery = "SELECT count(*) FROM mytable1 LIMIT 200000"; // 115545 rows for the test table + JsonNode expectedJson = postSqlQuery(sqlQuery, _brokerBaseApiUrl); + int[] expectedNumSubTasks = {1, 2, 2, 2, 1}; + int[] expectedNumSegmentsQueried = {13, 12, 13, 13, 12}; + int[] expectedNumMergedSegments = {2, 6, 11, 15, 16}; + long expectedWatermark = 16000 * 86_400_000L; + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_TEST_TABLE); + int numTasks = 0; + for (String tasks = _taskManager.scheduleTasks(Collections.singletonList(offlineTableName), true) + .get(MinionConstants.MergeRollupTask.TASK_TYPE); tasks != null; tasks = + _taskManager.scheduleTasks(Collections.singletonList(offlineTableName), true) + .get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + Assert.assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); + Assert.assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); + // Will not schedule task if there's incomplete task + Assert.assertNull(_taskManager.scheduleTasks(Collections.singletonList(offlineTableName), true) + .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + waitForTaskToComplete(); + + // Check watermark + MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord( + _taskManager.getClusterInfoAccessor().getMinionMergeRollupTaskZNRecord(offlineTableName)); + assertNotNull(minionTaskMetadata); + Assert.assertEquals((long) minionTaskMetadata.getWatermarkMap().get("100days"), expectedWatermark); + expectedWatermark += 100 * 86_400_000L; + + // Check num total doc of merged segments are the same as original segment + int numMergedSegments = 0; + for (OfflineSegmentZKMetadata metadata : _pinotHelixResourceManager.getOfflineSegmentMetadata(offlineTableName)) { + if (metadata.getSegmentName().startsWith("merged")) { + // Check merged segment zk metadata + assertNotNull(metadata.getCustomMap()); + assertEquals("100days", + metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY)); + // Check merged segments are time partitioned + assertEquals(metadata.getEndTimeMs() / (86_400_000L * 100), metadata.getStartTimeMs() / (86_400_000L * 100)); + numMergedSegments++; + } + } + assertEquals(numMergedSegments, expectedNumMergedSegments[numTasks]); + + // Check query response and routing + JsonNode actualJson = postSqlQuery(sqlQuery, _brokerBaseApiUrl); + int numSegmentsQueried = actualJson.get("numSegmentsQueried").asInt(); + assertEquals(numSegmentsQueried, expectedNumSegmentsQueried[numTasks]); + SqlResultComparator.areEqual(actualJson, expectedJson, sqlQuery); + } + + assertEquals(numTasks, 5); + } + + /** + * Test single level rollup task with duplicate data + */ + @Test + public void testSingleLevelRollup() throws Exception { + // The original segments are time partitioned by month: + // segmentName (totalDocs) + // myTable2_16071_16101_3_1, myTable2_16071_16101_3_2 (9746) + // myTable2_16102_16129_4_1, myTable2_16102_16129_4_2 (8690) + // myTable2_16130_16159_5_1, myTable2_16130_16159_5_2 (9621) + // myTable2_16160_16189_6_1, myTable2_16160_16189_6_2 (9454) + // myTable2_16190_16220_7_1, myTable2_16190_16220_7_2 (10329) + // myTable2_16221_16250_8_1, myTable2_16221_16250_8_2 (10468) + // myTable2_16251_16281_9_1, myTable2_16251_16281_9_2 (10499) + // myTable2_16282_16312_10_1, myTable2_16282_16312_10_2 (10196) + // myTable2_16313_16342_11_1, myTable2_16313_16342_11_2 (9136) + // myTable2_16343_16373_0_1, myTable2_16343_16373_0_2 (9292) + // myTable2_16374_16404_1_1, myTable2_16374_16404_1_2 (8736) + // myTable2_16405_16435_2_1, myTable2_16405_16435_2_2 (9378) + + // Expected merge tasks and result segments: + // 1. + // {myTable2_16071_16101_3_1, myTable2_16071_16101_3_2, myTable2_16102_16129_4_1, myTable2_16102_16129_4_2, + // myTable2_16130_16159_5_1, myTable2_16130_16159_5_2, myTable2_16160_16189_6_1, myTable2_16160_16189_6_2 + // myTable2_16190_16220_7} + // -> {merged_150days_T1_0_myTable2_16065_16198_0, merged_150days_T1_0_myTable2_16205_16219_1} + // 2. + // {merged_150days_T1_0_myTable2_16205_16219_1, myTable2_16221_16250_8_1, myTable2_16221_16250_8_2, + // myTable2_16251_16281_9_1, myTable2_16251_16281_9_2, myTable2_16282_16312_10_1 + // myTable2_16282_16312_10_2, myTable2_16313_16342_11_1, myTable2_16313_16342_11_2, + // myTable2_16343_16373_0_1, myTable2_16343_16373_0_2} + // -> {merged_150days_1628644088146_0_myTable2_16205_16345_0, + // merged_150days_1628644088146_0_myTable2_16352_16373_1} + // 3. + // {merged_150days_1628644088146_0_myTable2_16352_16373_1, myTable2_16374_16404_1_1, myTable2_16374_16404_1_2 + // myTable2_16405_16435_2_1, myTable2_16405_16435_2_2} + // -> {merged_150days_1628644105127_0_myTable2_16352_16429_0} + + String sqlQuery = "SELECT count(*) FROM myTable2 LIMIT 200000"; // 115545 rows for the test table + JsonNode expectedJson = postSqlQuery(sqlQuery, _brokerBaseApiUrl); + int[] expectedNumSegmentsQueried = {16, 7, 3}; + int[] expectedNumMergedSegments = {2, 4, 5}; + long expectedWatermark = 16050 * 86_400_000L; + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_ROLLUP_TEST_TABLE); + int numTasks = 0; + for (String tasks = _taskManager.scheduleTasks(Collections.singletonList(offlineTableName), true) + .get(MinionConstants.MergeRollupTask.TASK_TYPE); tasks != null; tasks = + _taskManager.scheduleTasks(Collections.singletonList(offlineTableName), true) + .get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + Assert.assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), 1); + Assert.assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); + // Will not schedule task if there's incomplete task + Assert.assertNull(_taskManager.scheduleTasks(Collections.singletonList(offlineTableName), true) + .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + waitForTaskToComplete(); + + // Check watermark + MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord( + _taskManager.getClusterInfoAccessor().getMinionMergeRollupTaskZNRecord(offlineTableName)); + assertNotNull(minionTaskMetadata); + Assert.assertEquals((long) minionTaskMetadata.getWatermarkMap().get("150days"), expectedWatermark); + expectedWatermark += 150 * 86_400_000L; + + // Check num total doc of merged segments are the same as original segment Review comment: Why the number of rows for the roll-up test is the same? Roll-up can aggregate the rows so the result may be smaller. I think that we should test the case where we reduce the number of rows after rollup. I thought we agreed to push the same data x2 and run the roll-up. ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java ########## @@ -0,0 +1,565 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.commons.io.FileUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.common.minion.MergeRollupTaskMetadata; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.compat.tests.SqlResultComparator; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Integration test for minion task of type "MergeRollupTask" + */ +public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrationTest { + private static final String SINGLE_LEVEL_CONCAT_TEST_TABLE = "myTable1"; + private static final String SINGLE_LEVEL_ROLLUP_TEST_TABLE = "myTable2"; + private static final String MULTI_LEVEL_CONCAT_TEST_TABLE = "myTable3"; + + protected PinotHelixTaskResourceManager _helixTaskResourceManager; + protected PinotTaskManager _taskManager; + protected PinotHelixResourceManager _pinotHelixResourceManager; + + protected final File _segmentDir1 = new File(_tempDir, "segmentDir1"); + protected final File _segmentDir2 = new File(_tempDir, "segmentDir2"); + protected final File _segmentDir3 = new File(_tempDir, "segmentDir3"); + protected final File _tarDir1 = new File(_tempDir, "tarDir1"); + protected final File _tarDir2 = new File(_tempDir, "tarDir2"); + protected final File _tarDir3 = new File(_tempDir, "tarDir3"); + + @BeforeClass + public void setUp() throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _tarDir1, _tarDir2, + _tarDir3); + + // Start the Pinot cluster + startZk(); + startController(); + startBrokers(1); + startServers(1); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig singleLevelConcatTableConfig = + createOfflineTableConfig(SINGLE_LEVEL_CONCAT_TEST_TABLE, getSingleLevelConcatTaskConfig()); + TableConfig singleLevelRollupTableConfig = + createOfflineTableConfig(SINGLE_LEVEL_ROLLUP_TEST_TABLE, getSingleLevelRollupTaskConfig()); + TableConfig multiLevelConcatTableConfig = + createOfflineTableConfig(MULTI_LEVEL_CONCAT_TEST_TABLE, getMultiLevelConcatTaskConfig()); + addTableConfig(singleLevelConcatTableConfig); + addTableConfig(singleLevelRollupTableConfig); + addTableConfig(multiLevelConcatTableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1, + _tarDir1); + buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "1"); + buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "2"); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, + _tarDir3); + uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1); + uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2); + uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3); + + // Set up the H2 connection + setUpH2Connection(avroFiles); + + // Initialize the query generator + setUpQueryGenerator(avroFiles); + + startMinion(); + _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); + _taskManager = _controllerStarter.getTaskManager(); + _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager(); + } + + private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig) { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName) + .setSchemaName(getSchemaName()) + .setTimeColumnName(getTimeColumnName()) + .setSortedColumn(getSortedColumn()) + .setInvertedIndexColumns(getInvertedIndexColumns()) + .setNoDictionaryColumns(getNoDictionaryColumns()) + .setRangeIndexColumns(getRangeIndexColumns()) + .setBloomFilterColumns(getBloomFilterColumns()) + .setFieldConfigList(getFieldConfigs()) + .setNumReplicas(getNumReplicas()) + .setSegmentVersion(getSegmentVersion()) + .setLoadMode(getLoadMode()) + .setTaskConfig(taskConfig) + .setBrokerTenant(getBrokerTenant()) + .setServerTenant(getServerTenant()) + .setIngestionConfig(getIngestionConfig()) + .setNullHandlingEnabled(getNullHandlingEnabled()) + .build(); + } + + private TableTaskConfig getSingleLevelConcatTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put("100days.mergeType", "concat"); + tableTaskConfigs.put("100days.bufferTimePeriod", "1d"); + tableTaskConfigs.put("100days.bucketTimePeriod", "100d"); + tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000"); + tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000"); + tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min"); + tableTaskConfigs.put("WeatherDelay.aggregationType", "min"); + return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); + } + + private TableTaskConfig getSingleLevelRollupTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put("150days.mergeType", "rollup"); + tableTaskConfigs.put("150days.bufferTimePeriod", "1d"); + tableTaskConfigs.put("150days.bucketTimePeriod", "150d"); + tableTaskConfigs.put("150days.roundBucketTimePeriod", "7d"); + return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); + } + + private TableTaskConfig getMultiLevelConcatTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put("45days.mergeType", "concat"); + tableTaskConfigs.put("45days.bufferTimePeriod", "1d"); + tableTaskConfigs.put("45days.bucketTimePeriod", "45d"); + tableTaskConfigs.put("45days.maxNumRecordsPerSegment", "100000"); + tableTaskConfigs.put("45days.maxNumRecordsPerTask", "100000"); + + tableTaskConfigs.put("90days.mergeType", "concat"); + tableTaskConfigs.put("90days.bufferTimePeriod", "1d"); + tableTaskConfigs.put("90days.bucketTimePeriod", "90d"); + tableTaskConfigs.put("90days.maxNumRecordsPerSegment", "100000"); + tableTaskConfigs.put("90days.maxNumRecordsPerTask", "100000"); + return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)); + } + + private static void buildSegmentsFromAvroWithPostfix(List<File> avroFiles, TableConfig tableConfig, + org.apache.pinot.spi.data.Schema schema, int baseSegmentIndex, File segmentDir, File tarDir, String postfix) + throws Exception { + int numAvroFiles = avroFiles.size(); + ExecutorService executorService = Executors.newFixedThreadPool(numAvroFiles); + List<Future<Void>> futures = new ArrayList<>(numAvroFiles); + for (int i = 0; i < numAvroFiles; i++) { + File avroFile = avroFiles.get(i); + int segmentIndex = i + baseSegmentIndex; + futures.add(executorService.submit(() -> { + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); + segmentGeneratorConfig.setInputFilePath(avroFile.getPath()); + segmentGeneratorConfig.setOutDir(segmentDir.getPath()); + segmentGeneratorConfig.setTableName(tableConfig.getTableName()); + // Test segment with space and special character in the file name + segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + "_" + postfix); + + // Build the segment + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig); + driver.build(); + + // Tar the segment + String segmentName = driver.getSegmentName(); + File indexDir = new File(segmentDir, segmentName); + File segmentTarFile = new File(tarDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile); + return null; + })); + } + executorService.shutdown(); + for (Future<Void> future : futures) { + future.get(); + } + } + + /** + * Test single level concat task with maxNumRecordPerTask, maxNumRecordPerSegment constraints + */ + @Test + public void testSingleLevelConcat() throws Exception { + // The original segments are time partitioned by month: + // segmentName (totalDocs) + // myTable1_16071_16101_3 (9746) + // myTable1_16102_16129_4 (8690) + // myTable1_16130_16159_5 (9621) + // myTable1_16160_16189_6 (9454) + // myTable1_16190_16220_7 (10329) + // myTable1_16221_16250_8 (10468) + // myTable1_16251_16281_9 (10499) + // myTable1_16282_16312_10 (10196) + // myTable1_16313_16342_11 (9136) + // myTable1_16343_16373_0 (9292) + // myTable1_16374_16404_1 (8736) + // myTable1_16405_16435_2 (9378) + + // Expected merge tasks and result segments: + // 1. + // {myTable1_16071_16101_3} + // -> {merged_100days_T1_0_myTable1_16071_16099_0, merged_100days_T1_0_myTable1_16100_16101_1} + // 2. + // {merged_100days_T1_0_myTable1_16100_16101_1, myTable1_16102_16129_4, myTable1_16130_16159_5} + // -> {merged_100days_T2_0_myTable1_16100_???_0(15000), merged_100days_T2_0_myTable1_???_16159_1} + // {myTable1_16160_16189_6, myTable1_16190_16220_7} + // -> {merged_100days_T2_1_myTable1_16160_16199_0, merged_100days_T2_1_myTable1_16200_16220_1} + // 3. + // {merged_100days_T2_1_myTable1_16200_16220_1, myTable1_16221_16250_8} + // -> {merged_100days_T3_0_myTable1_16200_???_0(15000), merged_100days_T3_0_myTable1_???_16250_1} + // {myTable1_16251_16281_9, myTable1_16282_16312_10} + // -> {merged_100days_T3_1_myTable1_16251_???_0(15000), merged_100days_T3_1_myTable1_???_16299_1, merged_100days_T3_1_myTable1_16300_16312_2} + // 4. + // {merged_100days_T3_1_myTable1_16300_16312_2, myTable1_16313_16342_11, myTable1_16343_16373_0} + // -> {merged_100days_T4_0_myTable1_16300_???_0(15000), merged_100days_T4_0_myTable1_???_16373_1} + // {myTable1_16374_16404_1} + // -> {merged_100days_T4_1_16374_16399_0, merged_100days_T4_1_16400_16404_1} + // 5. + // {merged_100days_T4_1_16400_16404_1, myTable1_16405_16435_2} + // -> {merged_100days_T5_0_myTable1_16400_16435_0} + + String sqlQuery = "SELECT count(*) FROM mytable1 LIMIT 200000"; // 115545 rows for the test table + JsonNode expectedJson = postSqlQuery(sqlQuery, _brokerBaseApiUrl); + int[] expectedNumSubTasks = {1, 2, 2, 2, 1}; + int[] expectedNumSegmentsQueried = {13, 12, 13, 13, 12}; + int[] expectedNumMergedSegments = {2, 6, 11, 15, 16}; + long expectedWatermark = 16000 * 86_400_000L; + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_TEST_TABLE); + int numTasks = 0; + for (String tasks = _taskManager.scheduleTasks(Collections.singletonList(offlineTableName), true) + .get(MinionConstants.MergeRollupTask.TASK_TYPE); tasks != null; tasks = + _taskManager.scheduleTasks(Collections.singletonList(offlineTableName), true) + .get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { + Assert.assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); + Assert.assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); + // Will not schedule task if there's incomplete task + Assert.assertNull(_taskManager.scheduleTasks(Collections.singletonList(offlineTableName), true) + .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + waitForTaskToComplete(); + + // Check watermark + MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord( + _taskManager.getClusterInfoAccessor().getMinionMergeRollupTaskZNRecord(offlineTableName)); + assertNotNull(minionTaskMetadata); + Assert.assertEquals((long) minionTaskMetadata.getWatermarkMap().get("100days"), expectedWatermark); + expectedWatermark += 100 * 86_400_000L; + + // Check num total doc of merged segments are the same as original segment + int numMergedSegments = 0; + for (OfflineSegmentZKMetadata metadata : _pinotHelixResourceManager.getOfflineSegmentMetadata(offlineTableName)) { + if (metadata.getSegmentName().startsWith("merged")) { + // Check merged segment zk metadata + assertNotNull(metadata.getCustomMap()); + assertEquals("100days", + metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY)); + // Check merged segments are time partitioned + assertEquals(metadata.getEndTimeMs() / (86_400_000L * 100), metadata.getStartTimeMs() / (86_400_000L * 100)); + numMergedSegments++; + } + } + assertEquals(numMergedSegments, expectedNumMergedSegments[numTasks]); + + // Check query response and routing + JsonNode actualJson = postSqlQuery(sqlQuery, _brokerBaseApiUrl); + int numSegmentsQueried = actualJson.get("numSegmentsQueried").asInt(); + assertEquals(numSegmentsQueried, expectedNumSegmentsQueried[numTasks]); + SqlResultComparator.areEqual(actualJson, expectedJson, sqlQuery); + } + + assertEquals(numTasks, 5); + } + + /** + * Test single level rollup task with duplicate data + */ + @Test + public void testSingleLevelRollup() throws Exception { + // The original segments are time partitioned by month: + // segmentName (totalDocs) + // myTable2_16071_16101_3_1, myTable2_16071_16101_3_2 (9746) + // myTable2_16102_16129_4_1, myTable2_16102_16129_4_2 (8690) + // myTable2_16130_16159_5_1, myTable2_16130_16159_5_2 (9621) + // myTable2_16160_16189_6_1, myTable2_16160_16189_6_2 (9454) + // myTable2_16190_16220_7_1, myTable2_16190_16220_7_2 (10329) + // myTable2_16221_16250_8_1, myTable2_16221_16250_8_2 (10468) + // myTable2_16251_16281_9_1, myTable2_16251_16281_9_2 (10499) + // myTable2_16282_16312_10_1, myTable2_16282_16312_10_2 (10196) + // myTable2_16313_16342_11_1, myTable2_16313_16342_11_2 (9136) + // myTable2_16343_16373_0_1, myTable2_16343_16373_0_2 (9292) + // myTable2_16374_16404_1_1, myTable2_16374_16404_1_2 (8736) + // myTable2_16405_16435_2_1, myTable2_16405_16435_2_2 (9378) + + // Expected merge tasks and result segments: + // 1. + // {myTable2_16071_16101_3_1, myTable2_16071_16101_3_2, myTable2_16102_16129_4_1, myTable2_16102_16129_4_2, + // myTable2_16130_16159_5_1, myTable2_16130_16159_5_2, myTable2_16160_16189_6_1, myTable2_16160_16189_6_2 + // myTable2_16190_16220_7} + // -> {merged_150days_T1_0_myTable2_16065_16198_0, merged_150days_T1_0_myTable2_16205_16219_1} + // 2. + // {merged_150days_T1_0_myTable2_16205_16219_1, myTable2_16221_16250_8_1, myTable2_16221_16250_8_2, + // myTable2_16251_16281_9_1, myTable2_16251_16281_9_2, myTable2_16282_16312_10_1 + // myTable2_16282_16312_10_2, myTable2_16313_16342_11_1, myTable2_16313_16342_11_2, + // myTable2_16343_16373_0_1, myTable2_16343_16373_0_2} + // -> {merged_150days_1628644088146_0_myTable2_16205_16345_0, + // merged_150days_1628644088146_0_myTable2_16352_16373_1} + // 3. + // {merged_150days_1628644088146_0_myTable2_16352_16373_1, myTable2_16374_16404_1_1, myTable2_16374_16404_1_2 + // myTable2_16405_16435_2_1, myTable2_16405_16435_2_2} + // -> {merged_150days_1628644105127_0_myTable2_16352_16429_0} + + String sqlQuery = "SELECT count(*) FROM myTable2 LIMIT 200000"; // 115545 rows for the test table Review comment: We can remove the `LIMIT` clause since we are always fetching 1 row. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org