snleee commented on code in PR #10463: URL: https://github.com/apache/pinot/pull/10463#discussion_r1223802703
########## pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java: ########## @@ -136,4 +136,18 @@ public static class SegmentGenerationAndPushTask { public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE = "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance"; } + + public static class UpsertCompactionTask { + public static final String TASK_TYPE = "UpsertCompactionTask"; + /** + * The time period to wait before picking segments for this task + * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days + */ + public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod"; + /** + * The maximum amount of old records allowed for an existing segment. + * e.g. if the current amount surpasses 2500000, then the segment will be compacted Review Comment: Let's fix the comment. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java: ########## @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.BiMap; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; +import org.apache.http.client.utils.URIBuilder; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; +import org.apache.pinot.controller.util.CompletionServiceHelper; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.annotations.minion.TaskGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@TaskGenerator +public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class); + private static final String DEFAULT_BUFFER_PERIOD = "7d"; + private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 10.0; Review Comment: Let's start with a bit more conservative number? e.g. 30% ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java: ########## @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; + + +public class UpsertCompactionMinionClusterIntegrationTest extends BaseClusterIntegrationTest { + protected PinotHelixTaskResourceManager _helixTaskResourceManager; + protected PinotTaskManager _taskManager; + private static final String PRIMARY_KEY_COL = "clientId"; + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME); + + @Override + protected String getSchemaFileName() { + return "upsert_table_test.schema"; + } + + @Override + protected String getSchemaName() { + return "upsertSchema"; + } + + @Override + protected String getAvroTarFileName() { + return "upsert_test.tar.gz"; + } + + @Override + protected String getPartitionColumn() { + return PRIMARY_KEY_COL; + } + + private TableTaskConfig getCompactionTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "0d"); + tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, "0"); + return new TableTaskConfig( + Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE, tableTaskConfigs)); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(1); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Start Kafka and push data into Kafka + startKafka(); + pushAvroIntoKafka(avroFiles); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions()); + tableConfig.setTaskConfig(getCompactionTaskConfig()); + addTableConfig(tableConfig); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), TableType.REALTIME, _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + + startMinion(); + _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); + _taskManager = _controllerStarter.getTaskManager(); + } + + @Override + protected void waitForAllDocsLoaded(long timeoutMs) + throws Exception { + TestUtils.waitForCondition(aVoid -> { + try { + return getCurrentCountStarResultWithoutUpsert() == getCountStarResultWithoutUpsert(); + } catch (Exception e) { + return null; + } + }, 100L, timeoutMs, "Failed to load all documents"); + assertEquals(getCurrentCountStarResult(), getCountStarResult()); + } + + private long getCountStarResultWithoutUpsert() { Review Comment: I think that we can simply do ``` assertEquals(getCurrentCountStarResult(), 600); ``` instead of adding functions for each expected value? ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java: ########## @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; + + +public class UpsertCompactionMinionClusterIntegrationTest extends BaseClusterIntegrationTest { + protected PinotHelixTaskResourceManager _helixTaskResourceManager; + protected PinotTaskManager _taskManager; + private static final String PRIMARY_KEY_COL = "clientId"; + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME); + + @Override + protected String getSchemaFileName() { + return "upsert_table_test.schema"; + } + + @Override + protected String getSchemaName() { + return "upsertSchema"; + } + + @Override + protected String getAvroTarFileName() { + return "upsert_test.tar.gz"; + } + + @Override + protected String getPartitionColumn() { + return PRIMARY_KEY_COL; + } + + private TableTaskConfig getCompactionTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "0d"); + tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, "0"); + return new TableTaskConfig( + Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE, tableTaskConfigs)); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(1); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Start Kafka and push data into Kafka + startKafka(); + pushAvroIntoKafka(avroFiles); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions()); + tableConfig.setTaskConfig(getCompactionTaskConfig()); + addTableConfig(tableConfig); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), TableType.REALTIME, _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + + startMinion(); + _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); + _taskManager = _controllerStarter.getTaskManager(); + } + + @Override + protected void waitForAllDocsLoaded(long timeoutMs) + throws Exception { + TestUtils.waitForCondition(aVoid -> { + try { + return getCurrentCountStarResultWithoutUpsert() == getCountStarResultWithoutUpsert(); + } catch (Exception e) { + return null; + } + }, 100L, timeoutMs, "Failed to load all documents"); + assertEquals(getCurrentCountStarResult(), getCountStarResult()); + } + + private long getCountStarResultWithoutUpsert() { + // 3 Avro files, each with 100 documents, one copy from streaming source, one copy from batch source + return 600; + } + + private long getCurrentCountStarResultWithoutUpsert() { + return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)") + .getResultSet(0).getLong(0); + } + + @Override + protected long getCountStarResult() { + return 3; + } + + private long getCountStarResultAfterCompaction() { + return 300; + } + + @AfterClass + public void tearDown() + throws IOException { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); + + // Test dropping all segments one by one + List<String> segments = listSegments(realtimeTableName); + assertFalse(segments.isEmpty()); + for (String segment : segments) { + dropSegment(realtimeTableName, segment); + } + // NOTE: There is a delay to remove the segment from property store + TestUtils.waitForCondition((aVoid) -> { + try { + return listSegments(realtimeTableName).isEmpty(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, 60_000L, "Failed to drop the segments"); + + dropRealtimeTable(realtimeTableName); + stopMinion(); + stopServer(); + stopBroker(); + stopController(); + stopKafka(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + } + + @Test + public void testCompaction() { Review Comment: Can we add some extra test to check some value equality for some primary key to make sure that compaction doesn't change the query correctness? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java: ########## @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.BiMap; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; +import org.apache.http.client.utils.URIBuilder; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; +import org.apache.pinot.controller.util.CompletionServiceHelper; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.annotations.minion.TaskGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@TaskGenerator +public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class); + private static final String DEFAULT_BUFFER_PERIOD = "7d"; + private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 10.0; + @Override + public String getTaskType() { + return MinionConstants.UpsertCompactionTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + for (TableConfig tableConfig: tableConfigs) { + if (!validate(tableConfig)) { + continue; + } + + String tableNameWithType = tableConfig.getTableName(); + LOGGER.info("Start generating task configs for table: {} for task: {}", + tableNameWithType, taskType); + + Map<String, String> taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType); + Map<String, String> compactionConfigs = getCompactionConfigs(taskConfigs); + List<SegmentZKMetadata> completedSegments = getCompletedSegments(tableNameWithType, compactionConfigs); + + if (completedSegments.isEmpty()) { + LOGGER.info("No completed segments were available for compaction for table: {}", tableNameWithType); + continue; + } + + // get server to segment mappings + Map<String, List<String>> serverToSegments = _clusterInfoAccessor.getServerToSegmentsMap(tableNameWithType); + Map<String, String> segmentToServer = getSegmentToServer(serverToSegments); + PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); + BiMap<String, String> serverToEndpoints; + try { + serverToEndpoints = pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); + } catch (InvalidConfigException e) { + throw new RuntimeException(e); + } + + Map<String, SegmentZKMetadata> urlToSegment; + try { + urlToSegment = + getUrlToSegmentMappings(tableNameWithType, completedSegments, segmentToServer, serverToEndpoints); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + // request the urls from the servers + CompletionServiceHelper completionServiceHelper = new CompletionServiceHelper( + Executors.newCachedThreadPool(), new MultiThreadedHttpConnectionManager(), serverToEndpoints.inverse()); + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + completionServiceHelper.doMultiGetRequest( + new ArrayList<>(urlToSegment.keySet()), tableNameWithType, true, 3000); + + // only compact segments that exceed the threshold + double invalidRecordsThresholdPercent = + Double.parseDouble(compactionConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, + String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT))); + List<SegmentZKMetadata> selectedSegments = new ArrayList<>(); + for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) { + double invalidRecordCount = Double.parseDouble(streamResponse.getValue()); + SegmentZKMetadata segment = urlToSegment.get(streamResponse.getKey()); + double invalidRecordPercent = (invalidRecordCount / segment.getTotalDocs()) * 100; + if (invalidRecordPercent > invalidRecordsThresholdPercent) { + selectedSegments.add(segment); + } + } + + int numTasks = 0; + int maxTasks = getMaxTasks(taskType, tableNameWithType, taskConfigs); + for (SegmentZKMetadata selectedSegment : selectedSegments) { + if (numTasks == maxTasks) { + break; + } + Map<String, String> configs = new HashMap<>(); + configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType); + configs.put(MinionConstants.SEGMENT_NAME_KEY, selectedSegment.getSegmentName()); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, selectedSegment.getDownloadUrl()); + configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); + configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(selectedSegment.getCrc())); + pinotTaskConfigs.add(new PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs)); + numTasks++; + } + LOGGER.info("Finished generating {} tasks configs for table: {} " + "for task: {}", + numTasks, tableNameWithType, taskType); + } + return pinotTaskConfigs; + } + + private List<SegmentZKMetadata> getCompletedSegments(String tableNameWithType, + Map<String, String> compactionConfigs) { + List<SegmentZKMetadata> completedSegments = new ArrayList<>(); + String bufferPeriod = compactionConfigs.getOrDefault( + UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD); + long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod); + List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); + for (SegmentZKMetadata segment : allSegments) { + CommonConstants.Segment.Realtime.Status status = segment.getStatus(); + // initial segments selection based on status and age + if (status.isCompleted()) { + boolean endedWithinBufferPeriod = segment.getEndTimeMs() <= (System.currentTimeMillis() - bufferMs); + boolean endsInTheFuture = segment.getEndTimeMs() > System.currentTimeMillis(); Review Comment: What's the rationale behind we add this to completed segments? ########## pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java: ########## @@ -464,6 +464,49 @@ public Response downloadValidDocIds( } } + @GET + @Path("/tables/{tableNameWithType}/segments/{segmentName}/invalidRecordCount") Review Comment: Let's rename the api and add a bit more information `/validDocIdMetadata` response: 1. total doc ids for segment 2. valid doc id count 3. invalid doc id count ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java: ########## @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.annotation.Nullable; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Response; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixManager; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.http.client.utils.URIBuilder; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; +import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; +import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class); + private static HelixManager _helixManager = MINION_CONTEXT.getHelixManager(); + private static HelixAdmin _clusterManagementTool = _helixManager.getClusterManagmentTool(); + private static String _clusterName = _helixManager.getClusterName(); + + private class CompactedRecordReader implements RecordReader { + private final PinotSegmentRecordReader _pinotSegmentRecordReader; + private final PeekableIntIterator _validDocIdsIterator; + // Reusable generic row to store the next row to return + GenericRow _nextRow = new GenericRow(); + // Flag to mark whether we need to fetch another row + boolean _nextRowReturned = true; + // Flag to mark whether all records have been iterated + boolean _finished = false; + + CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) { + _pinotSegmentRecordReader = new PinotSegmentRecordReader(); + _pinotSegmentRecordReader.init(indexDir, null, null); + _validDocIdsIterator = validDocIds.getIntIterator(); + } + + @Override + public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) { + } + + @Override + public boolean hasNext() { + if (_finished) { + return false; + } + + // If next row has not been returned, return true + if (!_nextRowReturned) { + return true; + } + + // Try to get the next row to return + if (_validDocIdsIterator.hasNext()) { + int docId = _validDocIdsIterator.next(); + _nextRow.clear(); + _pinotSegmentRecordReader.getRecord(docId, _nextRow); + _nextRowReturned = false; + return true; + } + + // Cannot find next row to return, return false + _finished = true; + return false; + } + + @Override + public GenericRow next() { + return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) { + Preconditions.checkState(!_nextRowReturned); + reuse.init(_nextRow); + _nextRowReturned = true; + return reuse; + } + + @Override + public void rewind() { + _pinotSegmentRecordReader.rewind(); + _nextRowReturned = true; + _finished = false; + } + + @Override + public void close() + throws IOException { + _pinotSegmentRecordReader.close(); + } + } + + @Override + protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir) + throws Exception { + _eventObserver.notifyProgress(pinotTaskConfig, "Compacting segment: " + indexDir); + Map<String, String> configs = pinotTaskConfig.getConfigs(); + String taskType = pinotTaskConfig.getTaskType(); + LOGGER.info("Starting task: {} with configs: {}", taskType, configs); + long startMillis = System.currentTimeMillis(); + + String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); + TableConfig tableConfig = getTableConfig(tableNameWithType); + ImmutableRoaringBitmap validDocIds = getValidDocIds(tableNameWithType, configs); + + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); + String segmentName = segmentMetadata.getName(); + try (CompactedRecordReader compactedRecordReader = new CompactedRecordReader(indexDir, validDocIds)) { + SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, compactedRecordReader); + driver.build(); + } + + File compactedSegmentFile = new File(workingDir, segmentName); + + SegmentConversionResult result = new SegmentConversionResult.Builder() + .setFile(compactedSegmentFile) + .setTableNameWithType(tableNameWithType) + .setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)) + .build(); + + long endMillis = System.currentTimeMillis(); + LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis)); + + return result; + } + + private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig, + SegmentMetadataImpl segmentMetadata, String segmentName) { + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, segmentMetadata.getSchema()); + config.setOutDir(workingDir.getPath()); + config.setSegmentName(segmentName); + // Keep index creation time the same as original segment because both segments use the same raw data. + // This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to + // identify if the new pushed segment has newer data than the existing one. + config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime())); + + // The time column type info is not stored in the segment metadata. + // Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT). + if (segmentMetadata.getTimeInterval() != null) { + config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName()); + config.setStartTime(Long.toString(segmentMetadata.getStartTime())); + config.setEndTime(Long.toString(segmentMetadata.getEndTime())); + config.setSegmentTimeUnit(segmentMetadata.getTimeUnit()); + } + return config; + } + + private static ImmutableRoaringBitmap getValidDocIds(String tableNameWithType, Map<String, String> configs) Review Comment: Can we add this functionality to `ServerSegmentMetadataReader`? It we move this logic to the `ServerSegmentMetadataReader`, other tasks will be able to leverage to fetching valid doc ids. Please take a look at `TableMetadataReader` and `ServerSegmentMetadataReader`. If it's too large change, let's at least add the `TODO` comment. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -527,6 +528,19 @@ static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue())); } } + } else if (taskTypeConfigName.equals(UPSERT_COMPACTION_TASK_TYPE)) { + // check table is realtime + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "UpsertCompactionTask only supports realtime tables!"); + // check upsert enabled + Preconditions.checkState(tableConfig.isUpsertEnabled(), + "Upsert must be enabled for UpsertCompactionTask"); + // check no malformed period + TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", "2d")); Review Comment: I think that we should add the precondition check here? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java: ########## @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.annotation.Nullable; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Response; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixManager; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.http.client.utils.URIBuilder; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; +import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; +import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class); + private static HelixManager _helixManager = MINION_CONTEXT.getHelixManager(); + private static HelixAdmin _clusterManagementTool = _helixManager.getClusterManagmentTool(); + private static String _clusterName = _helixManager.getClusterName(); + + private class CompactedRecordReader implements RecordReader { + private final PinotSegmentRecordReader _pinotSegmentRecordReader; + private final PeekableIntIterator _validDocIdsIterator; + // Reusable generic row to store the next row to return + GenericRow _nextRow = new GenericRow(); + // Flag to mark whether we need to fetch another row + boolean _nextRowReturned = true; + // Flag to mark whether all records have been iterated + boolean _finished = false; + + CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) { + _pinotSegmentRecordReader = new PinotSegmentRecordReader(); + _pinotSegmentRecordReader.init(indexDir, null, null); + _validDocIdsIterator = validDocIds.getIntIterator(); + } + + @Override + public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) { + } + + @Override + public boolean hasNext() { + if (_finished) { Review Comment: I think that `_finished` may not be needed. _finished = (!_validDocIdsIterator.hasNext() && _nextRowReturned) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org