This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 782c3c2df5 Remove ConvertToRawIndexTask (#10068) 782c3c2df5 is described below commit 782c3c2df59d2b173ba9ef595aeabd27cb00a332 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Jan 6 08:32:46 2023 -0800 Remove ConvertToRawIndexTask (#10068) --- .../apache/pinot/core/common/MinionConstants.java | 5 - .../pinot/core/minion/RawIndexConverter.java | 263 --------------------- ...vertToRawIndexMinionClusterIntegrationTest.java | 211 ----------------- .../ConvertToRawIndexTaskExecutor.java | 58 ----- .../ConvertToRawIndexTaskExecutorFactory.java | 49 ---- .../ConvertToRawIndexTaskGenerator.java | 140 ----------- ...nvertToRowIndexTaskProgressObserverFactory.java | 33 --- .../plugin/minion/tasks/TaskRegistryTest.java | 26 +- 8 files changed, 12 insertions(+), 773 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index c4007ee561..86bfa4829d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -59,11 +59,6 @@ public class MinionConstants { public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks"; public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments"; - public static class ConvertToRawIndexTask { - public static final String TASK_TYPE = "ConvertToRawIndexTask"; - public static final String COLUMNS_TO_CONVERT_KEY = "columnsToConvert"; - } - // Purges rows inside segment that match chosen criteria public static class PurgeTask { public static final String TASK_TYPE = "PurgeTask"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java deleted file mode 100644 index 49a786c340..0000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.minion; - -import com.google.common.base.Preconditions; -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; -import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; -import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; -import org.apache.pinot.segment.local.utils.CrcUtils; -import org.apache.pinot.segment.spi.ColumnMetadata; -import org.apache.pinot.segment.spi.ImmutableSegment; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.segment.spi.V1Constants; -import org.apache.pinot.segment.spi.compression.ChunkCompressionType; -import org.apache.pinot.segment.spi.creator.ForwardIndexCreatorProvider; -import org.apache.pinot.segment.spi.creator.IndexCreationContext; -import org.apache.pinot.segment.spi.creator.SegmentVersion; -import org.apache.pinot.segment.spi.datasource.DataSource; -import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; -import org.apache.pinot.segment.spi.index.reader.Dictionary; -import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; -import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; -import org.apache.pinot.spi.data.DimensionFieldSpec; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.FieldSpec.DataType; -import org.apache.pinot.spi.data.MetricFieldSpec; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.utils.ReadMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * The <code>RawIndexConverter</code> class takes a segment and converts the dictionary-based indexes inside the segment - * into raw indexes. - * <ul> - * <li> - * If columns to convert are specified, check whether their dictionary-based indexes exist and convert them. - * </li> - * <li> - * If not specified, for each metric column, calculate the size of dictionary-based index and uncompressed raw - * index. If the size of raw index is smaller or equal to (size of dictionary-based index * CONVERSION_THRESHOLD), - * convert it. - * </li> - * </ul> - * <p>After the conversion, add "rawIndex" into the segment metadata "optimizations" field. - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class RawIndexConverter { - private static final Logger LOGGER = LoggerFactory.getLogger(RawIndexConverter.class); - - // Threshold for the ratio of uncompressed raw index size and dictionary-based index size to trigger conversion - private static final int CONVERSION_THRESHOLD = 4; - - // BITS_PER_ELEMENT is not applicable for raw index - private static final int BITS_PER_ELEMENT_FOR_RAW_INDEX = -1; - - private final String _rawTableName; - private final ImmutableSegment _originalImmutableSegment; - private final SegmentMetadata _originalSegmentMetadata; - private final File _convertedIndexDir; - private final PropertiesConfiguration _convertedProperties; - private final String _columnsToConvert; - private final ForwardIndexCreatorProvider _indexCreatorProvider; - - /** - * NOTE: original segment should be in V1 format. - * TODO: support V3 format - */ - public RawIndexConverter(String rawTableName, File originalIndexDir, File convertedIndexDir, - @Nullable String columnsToConvert, ForwardIndexCreatorProvider indexCreatorProvider) - throws Exception { - FileUtils.copyDirectory(originalIndexDir, convertedIndexDir); - IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(); - indexLoadingConfig.setSegmentVersion(SegmentVersion.v1); - indexLoadingConfig.setReadMode(ReadMode.mmap); - _rawTableName = rawTableName; - _originalImmutableSegment = ImmutableSegmentLoader.load(originalIndexDir, indexLoadingConfig, null, false); - _originalSegmentMetadata = _originalImmutableSegment.getSegmentMetadata(); - _convertedIndexDir = convertedIndexDir; - _convertedProperties = - new PropertiesConfiguration(new File(_convertedIndexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME)); - _columnsToConvert = columnsToConvert; - _indexCreatorProvider = indexCreatorProvider; - } - - public boolean convert() - throws Exception { - String segmentName = _originalSegmentMetadata.getName(); - LOGGER.info("Start converting segment: {} in table: {}", segmentName, _rawTableName); - - List<FieldSpec> columnsToConvert = new ArrayList<>(); - Schema schema = _originalSegmentMetadata.getSchema(); - if (_columnsToConvert == null) { - LOGGER.info("Columns to convert are not specified, check each metric column"); - for (MetricFieldSpec metricFieldSpec : schema.getMetricFieldSpecs()) { - if (_originalSegmentMetadata.getColumnMetadataFor(metricFieldSpec.getName()).hasDictionary() - && shouldConvertColumn(metricFieldSpec)) { - columnsToConvert.add(metricFieldSpec); - } - } - } else { - LOGGER.info("Columns to convert: {}", _columnsToConvert); - for (String columnToConvert : StringUtils.split(_columnsToConvert, ',')) { - FieldSpec fieldSpec = schema.getFieldSpecFor(columnToConvert); - if (fieldSpec == null) { - LOGGER.warn("Skip converting column: {} because is does not exist in the schema", columnsToConvert); - continue; - } - if (!fieldSpec.isSingleValueField()) { - LOGGER.warn("Skip converting column: {} because it's a multi-value column", columnsToConvert); - continue; - } - if (!_originalSegmentMetadata.getColumnMetadataFor(columnToConvert).hasDictionary()) { - LOGGER.warn("Skip converting column: {} because its index is not dictionary-based", columnsToConvert); - continue; - } - columnsToConvert.add(fieldSpec); - } - } - - if (columnsToConvert.isEmpty()) { - LOGGER.info("No column converted for segment: {} in table: {}", segmentName, _rawTableName); - return false; - } else { - // Convert columns - for (FieldSpec columnToConvert : columnsToConvert) { - convertColumn(columnToConvert); - } - _convertedProperties.save(); - - // Update creation metadata with new computed CRC and original segment creation time - SegmentIndexCreationDriverImpl - .persistCreationMeta(_convertedIndexDir, CrcUtils.forAllFilesInFolder(_convertedIndexDir).computeCrc(), - _originalSegmentMetadata.getIndexCreationTime()); - - LOGGER.info("{} columns converted for segment: {} in table: {}", columnsToConvert.size(), segmentName, - _rawTableName); - return true; - } - } - - private boolean shouldConvertColumn(FieldSpec fieldSpec) { - String columnName = fieldSpec.getName(); - DataType storedType = fieldSpec.getDataType().getStoredType(); - int numTotalDocs = _originalSegmentMetadata.getTotalDocs(); - ColumnMetadata columnMetadata = _originalSegmentMetadata.getColumnMetadataFor(columnName); - - int cardinality = columnMetadata.getCardinality(); - - // In bits - int lengthOfEachEntry; - if (storedType.isFixedWidth()) { - lengthOfEachEntry = storedType.size() * Byte.SIZE; - } else { - lengthOfEachEntry = columnMetadata.getColumnMaxLength() * Byte.SIZE; - } - long dictionaryBasedIndexSize = - (long) numTotalDocs * columnMetadata.getBitsPerElement() + (long) cardinality * lengthOfEachEntry; - long rawIndexSize = (long) numTotalDocs * lengthOfEachEntry; - LOGGER.info( - "For column: {}, size of dictionary based index: {} bits, size of raw index (without compression): {} bits", - columnName, dictionaryBasedIndexSize, rawIndexSize); - - return rawIndexSize <= dictionaryBasedIndexSize * CONVERSION_THRESHOLD; - } - - private void convertColumn(FieldSpec fieldSpec) - throws Exception { - String columnName = fieldSpec.getName(); - LOGGER.info("Converting column: {}", columnName); - - // Delete dictionary and existing indexes - FileUtils.deleteQuietly(new File(_convertedIndexDir, columnName + V1Constants.Dict.FILE_EXTENSION)); - FileUtils.deleteQuietly( - new File(_convertedIndexDir, columnName + V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION)); - FileUtils.deleteQuietly( - new File(_convertedIndexDir, columnName + V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION)); - FileUtils.deleteQuietly( - new File(_convertedIndexDir, columnName + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION)); - - // Create the raw index - DataSource dataSource = _originalImmutableSegment.getDataSource(columnName); - ForwardIndexReader forwardIndexReader = dataSource.getForwardIndex(); - Preconditions.checkState(forwardIndexReader != null, - "Forward index disabled for column: %s, raw index conversion operation unsupported!", columnName); - Dictionary dictionary = dataSource.getDictionary(); - assert dictionary != null; - DataType storedType = dictionary.getValueType(); - int numDocs = _originalSegmentMetadata.getTotalDocs(); - ColumnMetadata columnMetadata = _originalSegmentMetadata.getColumnMetadataFor(columnName); - try (ForwardIndexCreator rawIndexCreator = _indexCreatorProvider.newForwardIndexCreator( - IndexCreationContext.builder().withIndexDir(_convertedIndexDir).withColumnMetadata(columnMetadata) - .withFieldSpec(new DimensionFieldSpec(columnName, storedType, columnMetadata.isSingleValue())) - .withDictionary(false).build().forForwardIndex(ChunkCompressionType.LZ4, null)); - ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) { - switch (storedType) { - case INT: - for (int docId = 0; docId < numDocs; docId++) { - rawIndexCreator.putInt(dictionary.getIntValue(forwardIndexReader.getDictId(docId, readerContext))); - } - break; - case LONG: - for (int docId = 0; docId < numDocs; docId++) { - rawIndexCreator.putLong(dictionary.getLongValue(forwardIndexReader.getDictId(docId, readerContext))); - } - break; - case FLOAT: - for (int docId = 0; docId < numDocs; docId++) { - rawIndexCreator.putFloat(dictionary.getFloatValue(forwardIndexReader.getDictId(docId, readerContext))); - } - break; - case DOUBLE: - for (int docId = 0; docId < numDocs; docId++) { - rawIndexCreator.putDouble(dictionary.getDoubleValue(forwardIndexReader.getDictId(docId, readerContext))); - } - break; - case STRING: - for (int docId = 0; docId < numDocs; docId++) { - rawIndexCreator.putString(dictionary.getStringValue(forwardIndexReader.getDictId(docId, readerContext))); - } - break; - case BYTES: - for (int docId = 0; docId < numDocs; docId++) { - rawIndexCreator.putBytes(dictionary.getBytesValue(forwardIndexReader.getDictId(docId, readerContext))); - } - break; - default: - throw new IllegalStateException(); - } - } - - // Update the segment metadata - _convertedProperties.setProperty( - V1Constants.MetadataKeys.Column.getKeyFor(columnName, V1Constants.MetadataKeys.Column.HAS_DICTIONARY), false); - _convertedProperties.setProperty( - V1Constants.MetadataKeys.Column.getKeyFor(columnName, V1Constants.MetadataKeys.Column.BITS_PER_ELEMENT), - BITS_PER_ELEMENT_FOR_RAW_INDEX); - } -} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java deleted file mode 100644 index b1df10ae0b..0000000000 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.integration.tests; - -import java.io.File; -import java.io.FilenameFilter; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.commons.lang.StringUtils; -import org.apache.helix.task.TaskState; -import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; -import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; -import org.apache.pinot.core.common.MinionConstants; -import org.apache.pinot.core.common.MinionConstants.ConvertToRawIndexTask; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.segment.spi.creator.SegmentVersion; -import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.spi.config.table.TableTaskConfig; -import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.apache.pinot.util.TestUtils; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - - -/** - * Integration test that extends HybridClusterIntegrationTest and add Minions into the cluster to convert 3 metric - * columns' index into raw index for OFFLINE segments. - */ -public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridClusterIntegrationTest { - private static final String COLUMNS_TO_CONVERT = "ActualElapsedTime,ArrDelay,DepDelay,CRSDepTime"; - - private PinotHelixTaskResourceManager _helixTaskResourceManager; - private PinotTaskManager _taskManager; - - @Nullable - @Override - protected List<String> getNoDictionaryColumns() { - return null; - } - - // NOTE: Only allow converting raw index for v1 segment - @Override - protected String getSegmentVersion() { - return SegmentVersion.v1.name(); - } - - @Override - protected TableTaskConfig getTaskConfig() { - Map<String, String> convertToRawIndexTaskConfigs = new HashMap<>(); - convertToRawIndexTaskConfigs.put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY, "5"); - convertToRawIndexTaskConfigs.put(ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, COLUMNS_TO_CONVERT); - return new TableTaskConfig(Collections.singletonMap(ConvertToRawIndexTask.TASK_TYPE, convertToRawIndexTaskConfigs)); - } - - @BeforeClass - public void setUp() - throws Exception { - // The parent setUp() sets up Zookeeper, Kafka, controller, broker and servers - super.setUp(); - - startMinion(); - _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); - _taskManager = _controllerStarter.getTaskManager(); - } - - @Test - public void testConvertToRawIndexTask() - throws Exception { - String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); - - File testDataDir = new File(CommonConstants.Server.DEFAULT_INSTANCE_DATA_DIR + "-0", offlineTableName); - if (!testDataDir.isDirectory()) { - testDataDir = new File(CommonConstants.Server.DEFAULT_INSTANCE_DATA_DIR + "-1", offlineTableName); - } - Assert.assertTrue(testDataDir.isDirectory()); - File tableDataDir = testDataDir; - - // Check that all columns have dictionary - // Skip the tmp directory since these are only temporary segments - FilenameFilter nonTmpFileFilter = new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return !name.equals("tmp"); - } - }; - File[] indexDirs = tableDataDir.listFiles(nonTmpFileFilter); - Assert.assertNotNull(indexDirs); - for (File indexDir : indexDirs) { - SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir); - for (String columnName : segmentMetadata.getSchema().getColumnNames()) { - Assert.assertTrue(segmentMetadata.getColumnMetadataFor(columnName).hasDictionary()); - } - } - - // Should create the task queues and generate a ConvertToRawIndexTask task with 5 child tasks - Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE)); - Assert.assertTrue(_helixTaskResourceManager.getTaskQueues() - .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(ConvertToRawIndexTask.TASK_TYPE))); - - // Should generate one more ConvertToRawIndexTask task with 3 child tasks - Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE)); - - // Should not generate more tasks - Assert.assertNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE)); - - // Wait at most 600 seconds for all tasks COMPLETED and new segments refreshed - TestUtils.waitForCondition(input -> { - // Check task state - for (TaskState taskState : _helixTaskResourceManager.getTaskStates(ConvertToRawIndexTask.TASK_TYPE).values()) { - if (taskState != TaskState.COMPLETED) { - return false; - } - } - - // Check segment ZK metadata - for (SegmentZKMetadata segmentZKMetadata : _helixResourceManager.getSegmentsZKMetadata(offlineTableName)) { - Map<String, String> customMap = segmentZKMetadata.getCustomMap(); - if (customMap == null || customMap.size() != 1 || !customMap - .containsKey(ConvertToRawIndexTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)) { - return false; - } - } - - // Check segment metadata - File[] indexDirs1 = tableDataDir.listFiles(nonTmpFileFilter); - Assert.assertNotNull(indexDirs1); - for (File indexDir : indexDirs1) { - SegmentMetadata segmentMetadata; - - // Segment metadata file might not exist if the segment is refreshing - try { - segmentMetadata = new SegmentMetadataImpl(indexDir); - } catch (Exception e) { - return false; - } - - // The columns in COLUMNS_TO_CONVERT should have raw index - List<String> rawIndexColumns = Arrays.asList(StringUtils.split(COLUMNS_TO_CONVERT, ',')); - for (String columnName : segmentMetadata.getSchema().getColumnNames()) { - if (rawIndexColumns.contains(columnName)) { - if (segmentMetadata.getColumnMetadataFor(columnName).hasDictionary()) { - return false; - } - } else { - if (!segmentMetadata.getColumnMetadataFor(columnName).hasDictionary()) { - return false; - } - } - } - } - - return true; - }, 600_000L, "Failed to get all tasks COMPLETED and new segments refreshed"); - } - - @Test - public void testPinotHelixResourceManagerAPIs() { - // Instance APIs - Assert.assertEquals(_helixResourceManager.getAllInstances().size(), 5); - Assert.assertEquals(_helixResourceManager.getOnlineInstanceList().size(), 5); - Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(), 0); - Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(), 0); - - // Table APIs - String rawTableName = getTableName(); - String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); - String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); - List<String> tableNames = _helixResourceManager.getAllTables(); - Assert.assertEquals(tableNames.size(), 2); - Assert.assertTrue(tableNames.contains(offlineTableName)); - Assert.assertTrue(tableNames.contains(realtimeTableName)); - Assert.assertEquals(_helixResourceManager.getAllRawTables(), Collections.singletonList(rawTableName)); - Assert.assertEquals(_helixResourceManager.getAllRealtimeTables(), Collections.singletonList(realtimeTableName)); - - // Tenant APIs - Assert.assertEquals(_helixResourceManager.getAllBrokerTenantNames(), Collections.singleton("TestTenant")); - Assert.assertEquals(_helixResourceManager.getAllServerTenantNames(), Collections.singleton("TestTenant")); - } - - @AfterClass - public void tearDown() - throws Exception { - stopMinion(); - - super.tearDown(); - } -} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java deleted file mode 100644 index 21ea6467c7..0000000000 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.minion.tasks.converttorawindex; - -import java.io.File; -import java.util.Collections; -import java.util.Map; -import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; -import org.apache.pinot.core.common.MinionConstants; -import org.apache.pinot.core.minion.PinotTaskConfig; -import org.apache.pinot.core.minion.RawIndexConverter; -import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; -import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; -import org.apache.pinot.segment.spi.index.IndexingOverrides; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; - - -public class ConvertToRawIndexTaskExecutor extends BaseSingleSegmentConversionExecutor { - - @Override - protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir) - throws Exception { - Map<String, String> configs = pinotTaskConfig.getConfigs(); - String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - _eventObserver.notifyProgress(pinotTaskConfig, "Converting segment: " + indexDir); - new RawIndexConverter(rawTableName, indexDir, workingDir, - configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY), - IndexingOverrides.getIndexCreatorProvider()).convert(); - return new SegmentConversionResult.Builder().setFile(workingDir) - .setTableNameWithType(configs.get(MinionConstants.TABLE_NAME_KEY)) - .setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)).build(); - } - - @Override - protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, - SegmentConversionResult segmentConversionResult) { - return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections - .singletonMap(MinionConstants.ConvertToRawIndexTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX, - String.valueOf(System.currentTimeMillis()))); - } -} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java deleted file mode 100644 index 0b208c483a..0000000000 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.minion.tasks.converttorawindex; - -import org.apache.pinot.core.common.MinionConstants; -import org.apache.pinot.minion.MinionConf; -import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager; -import org.apache.pinot.minion.executor.PinotTaskExecutor; -import org.apache.pinot.minion.executor.PinotTaskExecutorFactory; -import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory; - - -@TaskExecutorFactory -public class ConvertToRawIndexTaskExecutorFactory implements PinotTaskExecutorFactory { - - @Override - public void init(MinionTaskZkMetadataManager zkMetadataManager) { - } - - @Override - public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) { - } - - @Override - public String getTaskType() { - return MinionConstants.ConvertToRawIndexTask.TASK_TYPE; - } - - @Override - public PinotTaskExecutor create() { - return new ConvertToRawIndexTaskExecutor(); - } -} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java deleted file mode 100644 index 0d0d7699f7..0000000000 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.minion.tasks.converttorawindex; - -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.pinot.common.data.Segment; -import org.apache.pinot.common.lineage.SegmentLineage; -import org.apache.pinot.common.lineage.SegmentLineageUtils; -import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; -import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; -import org.apache.pinot.core.common.MinionConstants; -import org.apache.pinot.core.minion.PinotTaskConfig; -import org.apache.pinot.spi.annotations.minion.TaskGenerator; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableTaskConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -@TaskGenerator -public class ConvertToRawIndexTaskGenerator extends BaseTaskGenerator { - private static final Logger LOGGER = LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class); - - @Override - public String getTaskType() { - return MinionConstants.ConvertToRawIndexTask.TASK_TYPE; - } - - @Override - public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { - List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); - - // Get the segments that are being converted so that we don't submit them again - Set<Segment> runningSegments = - TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, _clusterInfoAccessor); - - for (TableConfig tableConfig : tableConfigs) { - // Only generate tasks for OFFLINE tables - String offlineTableName = tableConfig.getTableName(); - if (tableConfig.getTableType() != TableType.OFFLINE) { - LOGGER.warn("Skip generating ConvertToRawIndexTask for non-OFFLINE table: {}", offlineTableName); - continue; - } - - TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); - Preconditions.checkNotNull(tableTaskConfig); - Map<String, String> taskConfigs = - tableTaskConfig.getConfigsForTaskType(MinionConstants.ConvertToRawIndexTask.TASK_TYPE); - Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: {}", offlineTableName); - - // Get max number of tasks for this table - int tableMaxNumTasks; - String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY); - if (tableMaxNumTasksConfig != null) { - try { - tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig); - } catch (Exception e) { - tableMaxNumTasks = Integer.MAX_VALUE; - } - } else { - tableMaxNumTasks = Integer.MAX_VALUE; - } - - // Get the config for columns to convert - String columnsToConvertConfig = taskConfigs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY); - - // Generate tasks - List<SegmentZKMetadata> offlineSegmentsZKMetadata = _clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName); - SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(offlineTableName); - Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>(); - for (SegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentsZKMetadata) { - preSelectedSegmentsBasedOnLineage.add(offlineSegmentZKMetadata.getSegmentName()); - } - SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage); - - int tableNumTasks = 0; - for (SegmentZKMetadata segmentZKMetadata : offlineSegmentsZKMetadata) { - // Generate up to tableMaxNumTasks tasks each time for each table - if (tableNumTasks == tableMaxNumTasks) { - break; - } - - // Skip segments that are already submitted - String segmentName = segmentZKMetadata.getSegmentName(); - if (runningSegments.contains(new Segment(offlineTableName, segmentName))) { - continue; - } - - // Skip segments based on lineage: for COMPLETED lineage, segments in `segmentsFrom` will be removed by - // retention manager, for IN_PROGRESS lineage, segments in `segmentsTo` are uploaded yet - if (!preSelectedSegmentsBasedOnLineage.contains(segmentName)) { - continue; - } - - // Only submit segments that have not been converted - Map<String, String> customMap = segmentZKMetadata.getCustomMap(); - if (customMap == null || !customMap.containsKey( - MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY + MinionConstants.TASK_TIME_SUFFIX)) { - Map<String, String> configs = new HashMap<>(); - configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName); - configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName); - configs.put(MinionConstants.DOWNLOAD_URL_KEY, segmentZKMetadata.getDownloadUrl()); - configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); - configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segmentZKMetadata.getCrc())); - if (columnsToConvertConfig != null) { - configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, columnsToConvertConfig); - } - pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, configs)); - tableNumTasks++; - } - } - } - - return pinotTaskConfigs; - } -} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRowIndexTaskProgressObserverFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRowIndexTaskProgressObserverFactory.java deleted file mode 100644 index 53d456d9f6..0000000000 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRowIndexTaskProgressObserverFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.minion.tasks.converttorawindex; - -import org.apache.pinot.core.common.MinionConstants; -import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory; -import org.apache.pinot.spi.annotations.minion.EventObserverFactory; - - -@EventObserverFactory -public class ConvertToRowIndexTaskProgressObserverFactory extends BaseMinionProgressObserverFactory { - - @Override - public String getTaskType() { - return MinionConstants.ConvertToRawIndexTask.TASK_TYPE; - } -} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java index 667625af33..6abbf1eea6 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java @@ -21,38 +21,36 @@ package org.apache.pinot.plugin.minion.tasks; import java.util.Set; import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry; import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry; -import org.apache.pinot.plugin.minion.tasks.converttorawindex.ConvertToRawIndexTaskExecutorFactory; -import org.apache.pinot.plugin.minion.tasks.converttorawindex.ConvertToRawIndexTaskGenerator; import org.apache.pinot.plugin.minion.tasks.mergerollup.MergeRollupTaskExecutorFactory; import org.apache.pinot.plugin.minion.tasks.mergerollup.MergeRollupTaskGenerator; import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskExecutorFactory; +import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskGenerator; import org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutorFactory; import org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator; import org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush.SegmentGenerationAndPushTaskExecutorFactory; import org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush.SegmentGenerationAndPushTaskGenerator; -import org.testng.Assert; import org.testng.annotations.Test; +import static org.testng.Assert.assertTrue; + public class TaskRegistryTest { + @Test public void testTaskGeneratorRegistry() { Set<Class<?>> classes = TaskGeneratorRegistry.getTaskGeneratorClasses(); - Assert.assertTrue(classes.size() >= 4); - Assert.assertTrue(classes.contains(ConvertToRawIndexTaskGenerator.class)); - Assert.assertTrue(classes.contains(MergeRollupTaskGenerator.class)); - Assert.assertTrue(classes.contains(SegmentGenerationAndPushTaskGenerator.class)); - Assert.assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskGenerator.class)); + assertTrue(classes.contains(MergeRollupTaskGenerator.class)); + assertTrue(classes.contains(PurgeTaskGenerator.class)); + assertTrue(classes.contains(SegmentGenerationAndPushTaskGenerator.class)); + assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskGenerator.class)); } @Test public void testTaskExecutorRegistry() { Set<Class<?>> classes = TaskExecutorFactoryRegistry.getTaskExecutorFactoryClasses(); - Assert.assertTrue(classes.size() >= 5); - Assert.assertTrue(classes.contains(ConvertToRawIndexTaskExecutorFactory.class)); - Assert.assertTrue(classes.contains(MergeRollupTaskExecutorFactory.class)); - Assert.assertTrue(classes.contains(PurgeTaskExecutorFactory.class)); - Assert.assertTrue(classes.contains(SegmentGenerationAndPushTaskExecutorFactory.class)); - Assert.assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskExecutorFactory.class)); + assertTrue(classes.contains(MergeRollupTaskExecutorFactory.class)); + assertTrue(classes.contains(PurgeTaskExecutorFactory.class)); + assertTrue(classes.contains(SegmentGenerationAndPushTaskExecutorFactory.class)); + assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskExecutorFactory.class)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org