This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 782c3c2df5 Remove ConvertToRawIndexTask (#10068)
782c3c2df5 is described below
commit 782c3c2df59d2b173ba9ef595aeabd27cb00a332
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jan 6 08:32:46 2023 -0800
Remove ConvertToRawIndexTask (#10068)
---
.../apache/pinot/core/common/MinionConstants.java | 5 -
.../pinot/core/minion/RawIndexConverter.java | 263 ---------------------
...vertToRawIndexMinionClusterIntegrationTest.java | 211 -----------------
.../ConvertToRawIndexTaskExecutor.java | 58 -----
.../ConvertToRawIndexTaskExecutorFactory.java | 49 ----
.../ConvertToRawIndexTaskGenerator.java | 140 -----------
...nvertToRowIndexTaskProgressObserverFactory.java | 33 ---
.../plugin/minion/tasks/TaskRegistryTest.java | 26 +-
8 files changed, 12 insertions(+), 773 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index c4007ee561..86bfa4829d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -59,11 +59,6 @@ public class MinionConstants {
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY =
"enableReplaceSegments";
- public static class ConvertToRawIndexTask {
- public static final String TASK_TYPE = "ConvertToRawIndexTask";
- public static final String COLUMNS_TO_CONVERT_KEY = "columnsToConvert";
- }
-
// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
deleted file mode 100644
index 49a786c340..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.minion;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.utils.CrcUtils;
-import org.apache.pinot.segment.spi.ColumnMetadata;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
-import org.apache.pinot.segment.spi.creator.ForwardIndexCreatorProvider;
-import org.apache.pinot.segment.spi.creator.IndexCreationContext;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.MetricFieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>RawIndexConverter</code> class takes a segment and converts the
dictionary-based indexes inside the segment
- * into raw indexes.
- * <ul>
- * <li>
- * If columns to convert are specified, check whether their
dictionary-based indexes exist and convert them.
- * </li>
- * <li>
- * If not specified, for each metric column, calculate the size of
dictionary-based index and uncompressed raw
- * index. If the size of raw index is smaller or equal to (size of
dictionary-based index * CONVERSION_THRESHOLD),
- * convert it.
- * </li>
- * </ul>
- * <p>After the conversion, add "rawIndex" into the segment metadata
"optimizations" field.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class RawIndexConverter {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RawIndexConverter.class);
-
- // Threshold for the ratio of uncompressed raw index size and
dictionary-based index size to trigger conversion
- private static final int CONVERSION_THRESHOLD = 4;
-
- // BITS_PER_ELEMENT is not applicable for raw index
- private static final int BITS_PER_ELEMENT_FOR_RAW_INDEX = -1;
-
- private final String _rawTableName;
- private final ImmutableSegment _originalImmutableSegment;
- private final SegmentMetadata _originalSegmentMetadata;
- private final File _convertedIndexDir;
- private final PropertiesConfiguration _convertedProperties;
- private final String _columnsToConvert;
- private final ForwardIndexCreatorProvider _indexCreatorProvider;
-
- /**
- * NOTE: original segment should be in V1 format.
- * TODO: support V3 format
- */
- public RawIndexConverter(String rawTableName, File originalIndexDir, File
convertedIndexDir,
- @Nullable String columnsToConvert, ForwardIndexCreatorProvider
indexCreatorProvider)
- throws Exception {
- FileUtils.copyDirectory(originalIndexDir, convertedIndexDir);
- IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
- indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
- indexLoadingConfig.setReadMode(ReadMode.mmap);
- _rawTableName = rawTableName;
- _originalImmutableSegment = ImmutableSegmentLoader.load(originalIndexDir,
indexLoadingConfig, null, false);
- _originalSegmentMetadata = _originalImmutableSegment.getSegmentMetadata();
- _convertedIndexDir = convertedIndexDir;
- _convertedProperties =
- new PropertiesConfiguration(new File(_convertedIndexDir,
V1Constants.MetadataKeys.METADATA_FILE_NAME));
- _columnsToConvert = columnsToConvert;
- _indexCreatorProvider = indexCreatorProvider;
- }
-
- public boolean convert()
- throws Exception {
- String segmentName = _originalSegmentMetadata.getName();
- LOGGER.info("Start converting segment: {} in table: {}", segmentName,
_rawTableName);
-
- List<FieldSpec> columnsToConvert = new ArrayList<>();
- Schema schema = _originalSegmentMetadata.getSchema();
- if (_columnsToConvert == null) {
- LOGGER.info("Columns to convert are not specified, check each metric
column");
- for (MetricFieldSpec metricFieldSpec : schema.getMetricFieldSpecs()) {
- if
(_originalSegmentMetadata.getColumnMetadataFor(metricFieldSpec.getName()).hasDictionary()
- && shouldConvertColumn(metricFieldSpec)) {
- columnsToConvert.add(metricFieldSpec);
- }
- }
- } else {
- LOGGER.info("Columns to convert: {}", _columnsToConvert);
- for (String columnToConvert : StringUtils.split(_columnsToConvert, ','))
{
- FieldSpec fieldSpec = schema.getFieldSpecFor(columnToConvert);
- if (fieldSpec == null) {
- LOGGER.warn("Skip converting column: {} because is does not exist in
the schema", columnsToConvert);
- continue;
- }
- if (!fieldSpec.isSingleValueField()) {
- LOGGER.warn("Skip converting column: {} because it's a multi-value
column", columnsToConvert);
- continue;
- }
- if
(!_originalSegmentMetadata.getColumnMetadataFor(columnToConvert).hasDictionary())
{
- LOGGER.warn("Skip converting column: {} because its index is not
dictionary-based", columnsToConvert);
- continue;
- }
- columnsToConvert.add(fieldSpec);
- }
- }
-
- if (columnsToConvert.isEmpty()) {
- LOGGER.info("No column converted for segment: {} in table: {}",
segmentName, _rawTableName);
- return false;
- } else {
- // Convert columns
- for (FieldSpec columnToConvert : columnsToConvert) {
- convertColumn(columnToConvert);
- }
- _convertedProperties.save();
-
- // Update creation metadata with new computed CRC and original segment
creation time
- SegmentIndexCreationDriverImpl
- .persistCreationMeta(_convertedIndexDir,
CrcUtils.forAllFilesInFolder(_convertedIndexDir).computeCrc(),
- _originalSegmentMetadata.getIndexCreationTime());
-
- LOGGER.info("{} columns converted for segment: {} in table: {}",
columnsToConvert.size(), segmentName,
- _rawTableName);
- return true;
- }
- }
-
- private boolean shouldConvertColumn(FieldSpec fieldSpec) {
- String columnName = fieldSpec.getName();
- DataType storedType = fieldSpec.getDataType().getStoredType();
- int numTotalDocs = _originalSegmentMetadata.getTotalDocs();
- ColumnMetadata columnMetadata =
_originalSegmentMetadata.getColumnMetadataFor(columnName);
-
- int cardinality = columnMetadata.getCardinality();
-
- // In bits
- int lengthOfEachEntry;
- if (storedType.isFixedWidth()) {
- lengthOfEachEntry = storedType.size() * Byte.SIZE;
- } else {
- lengthOfEachEntry = columnMetadata.getColumnMaxLength() * Byte.SIZE;
- }
- long dictionaryBasedIndexSize =
- (long) numTotalDocs * columnMetadata.getBitsPerElement() + (long)
cardinality * lengthOfEachEntry;
- long rawIndexSize = (long) numTotalDocs * lengthOfEachEntry;
- LOGGER.info(
- "For column: {}, size of dictionary based index: {} bits, size of raw
index (without compression): {} bits",
- columnName, dictionaryBasedIndexSize, rawIndexSize);
-
- return rawIndexSize <= dictionaryBasedIndexSize * CONVERSION_THRESHOLD;
- }
-
- private void convertColumn(FieldSpec fieldSpec)
- throws Exception {
- String columnName = fieldSpec.getName();
- LOGGER.info("Converting column: {}", columnName);
-
- // Delete dictionary and existing indexes
- FileUtils.deleteQuietly(new File(_convertedIndexDir, columnName +
V1Constants.Dict.FILE_EXTENSION));
- FileUtils.deleteQuietly(
- new File(_convertedIndexDir, columnName +
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION));
- FileUtils.deleteQuietly(
- new File(_convertedIndexDir, columnName +
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION));
- FileUtils.deleteQuietly(
- new File(_convertedIndexDir, columnName +
V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION));
-
- // Create the raw index
- DataSource dataSource =
_originalImmutableSegment.getDataSource(columnName);
- ForwardIndexReader forwardIndexReader = dataSource.getForwardIndex();
- Preconditions.checkState(forwardIndexReader != null,
- "Forward index disabled for column: %s, raw index conversion operation
unsupported!", columnName);
- Dictionary dictionary = dataSource.getDictionary();
- assert dictionary != null;
- DataType storedType = dictionary.getValueType();
- int numDocs = _originalSegmentMetadata.getTotalDocs();
- ColumnMetadata columnMetadata =
_originalSegmentMetadata.getColumnMetadataFor(columnName);
- try (ForwardIndexCreator rawIndexCreator =
_indexCreatorProvider.newForwardIndexCreator(
-
IndexCreationContext.builder().withIndexDir(_convertedIndexDir).withColumnMetadata(columnMetadata)
- .withFieldSpec(new DimensionFieldSpec(columnName, storedType,
columnMetadata.isSingleValue()))
-
.withDictionary(false).build().forForwardIndex(ChunkCompressionType.LZ4, null));
- ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext()) {
- switch (storedType) {
- case INT:
- for (int docId = 0; docId < numDocs; docId++) {
-
rawIndexCreator.putInt(dictionary.getIntValue(forwardIndexReader.getDictId(docId,
readerContext)));
- }
- break;
- case LONG:
- for (int docId = 0; docId < numDocs; docId++) {
-
rawIndexCreator.putLong(dictionary.getLongValue(forwardIndexReader.getDictId(docId,
readerContext)));
- }
- break;
- case FLOAT:
- for (int docId = 0; docId < numDocs; docId++) {
-
rawIndexCreator.putFloat(dictionary.getFloatValue(forwardIndexReader.getDictId(docId,
readerContext)));
- }
- break;
- case DOUBLE:
- for (int docId = 0; docId < numDocs; docId++) {
-
rawIndexCreator.putDouble(dictionary.getDoubleValue(forwardIndexReader.getDictId(docId,
readerContext)));
- }
- break;
- case STRING:
- for (int docId = 0; docId < numDocs; docId++) {
-
rawIndexCreator.putString(dictionary.getStringValue(forwardIndexReader.getDictId(docId,
readerContext)));
- }
- break;
- case BYTES:
- for (int docId = 0; docId < numDocs; docId++) {
-
rawIndexCreator.putBytes(dictionary.getBytesValue(forwardIndexReader.getDictId(docId,
readerContext)));
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
-
- // Update the segment metadata
- _convertedProperties.setProperty(
- V1Constants.MetadataKeys.Column.getKeyFor(columnName,
V1Constants.MetadataKeys.Column.HAS_DICTIONARY), false);
- _convertedProperties.setProperty(
- V1Constants.MetadataKeys.Column.getKeyFor(columnName,
V1Constants.MetadataKeys.Column.BITS_PER_ELEMENT),
- BITS_PER_ELEMENT_FOR_RAW_INDEX);
- }
-}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
deleted file mode 100644
index b1df10ae0b..0000000000
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.commons.lang.StringUtils;
-import org.apache.helix.task.TaskState;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
-import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.common.MinionConstants.ConvertToRawIndexTask;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-/**
- * Integration test that extends HybridClusterIntegrationTest and add Minions
into the cluster to convert 3 metric
- * columns' index into raw index for OFFLINE segments.
- */
-public class ConvertToRawIndexMinionClusterIntegrationTest extends
HybridClusterIntegrationTest {
- private static final String COLUMNS_TO_CONVERT =
"ActualElapsedTime,ArrDelay,DepDelay,CRSDepTime";
-
- private PinotHelixTaskResourceManager _helixTaskResourceManager;
- private PinotTaskManager _taskManager;
-
- @Nullable
- @Override
- protected List<String> getNoDictionaryColumns() {
- return null;
- }
-
- // NOTE: Only allow converting raw index for v1 segment
- @Override
- protected String getSegmentVersion() {
- return SegmentVersion.v1.name();
- }
-
- @Override
- protected TableTaskConfig getTaskConfig() {
- Map<String, String> convertToRawIndexTaskConfigs = new HashMap<>();
- convertToRawIndexTaskConfigs.put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY,
"5");
-
convertToRawIndexTaskConfigs.put(ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY,
COLUMNS_TO_CONVERT);
- return new
TableTaskConfig(Collections.singletonMap(ConvertToRawIndexTask.TASK_TYPE,
convertToRawIndexTaskConfigs));
- }
-
- @BeforeClass
- public void setUp()
- throws Exception {
- // The parent setUp() sets up Zookeeper, Kafka, controller, broker and
servers
- super.setUp();
-
- startMinion();
- _helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
- _taskManager = _controllerStarter.getTaskManager();
- }
-
- @Test
- public void testConvertToRawIndexTask()
- throws Exception {
- String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
-
- File testDataDir = new
File(CommonConstants.Server.DEFAULT_INSTANCE_DATA_DIR + "-0", offlineTableName);
- if (!testDataDir.isDirectory()) {
- testDataDir = new File(CommonConstants.Server.DEFAULT_INSTANCE_DATA_DIR
+ "-1", offlineTableName);
- }
- Assert.assertTrue(testDataDir.isDirectory());
- File tableDataDir = testDataDir;
-
- // Check that all columns have dictionary
- // Skip the tmp directory since these are only temporary segments
- FilenameFilter nonTmpFileFilter = new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return !name.equals("tmp");
- }
- };
- File[] indexDirs = tableDataDir.listFiles(nonTmpFileFilter);
- Assert.assertNotNull(indexDirs);
- for (File indexDir : indexDirs) {
- SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
- for (String columnName : segmentMetadata.getSchema().getColumnNames()) {
-
Assert.assertTrue(segmentMetadata.getColumnMetadataFor(columnName).hasDictionary());
- }
- }
-
- // Should create the task queues and generate a ConvertToRawIndexTask task
with 5 child tasks
-
Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
- Assert.assertTrue(_helixTaskResourceManager.getTaskQueues()
-
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(ConvertToRawIndexTask.TASK_TYPE)));
-
- // Should generate one more ConvertToRawIndexTask task with 3 child tasks
-
Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
-
- // Should not generate more tasks
-
Assert.assertNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
-
- // Wait at most 600 seconds for all tasks COMPLETED and new segments
refreshed
- TestUtils.waitForCondition(input -> {
- // Check task state
- for (TaskState taskState :
_helixTaskResourceManager.getTaskStates(ConvertToRawIndexTask.TASK_TYPE).values())
{
- if (taskState != TaskState.COMPLETED) {
- return false;
- }
- }
-
- // Check segment ZK metadata
- for (SegmentZKMetadata segmentZKMetadata :
_helixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
- Map<String, String> customMap = segmentZKMetadata.getCustomMap();
- if (customMap == null || customMap.size() != 1 || !customMap
- .containsKey(ConvertToRawIndexTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX)) {
- return false;
- }
- }
-
- // Check segment metadata
- File[] indexDirs1 = tableDataDir.listFiles(nonTmpFileFilter);
- Assert.assertNotNull(indexDirs1);
- for (File indexDir : indexDirs1) {
- SegmentMetadata segmentMetadata;
-
- // Segment metadata file might not exist if the segment is refreshing
- try {
- segmentMetadata = new SegmentMetadataImpl(indexDir);
- } catch (Exception e) {
- return false;
- }
-
- // The columns in COLUMNS_TO_CONVERT should have raw index
- List<String> rawIndexColumns =
Arrays.asList(StringUtils.split(COLUMNS_TO_CONVERT, ','));
- for (String columnName : segmentMetadata.getSchema().getColumnNames())
{
- if (rawIndexColumns.contains(columnName)) {
- if
(segmentMetadata.getColumnMetadataFor(columnName).hasDictionary()) {
- return false;
- }
- } else {
- if
(!segmentMetadata.getColumnMetadataFor(columnName).hasDictionary()) {
- return false;
- }
- }
- }
- }
-
- return true;
- }, 600_000L, "Failed to get all tasks COMPLETED and new segments
refreshed");
- }
-
- @Test
- public void testPinotHelixResourceManagerAPIs() {
- // Instance APIs
- Assert.assertEquals(_helixResourceManager.getAllInstances().size(), 5);
- Assert.assertEquals(_helixResourceManager.getOnlineInstanceList().size(),
5);
-
Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(),
0);
-
Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(),
0);
-
- // Table APIs
- String rawTableName = getTableName();
- String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
- String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
- List<String> tableNames = _helixResourceManager.getAllTables();
- Assert.assertEquals(tableNames.size(), 2);
- Assert.assertTrue(tableNames.contains(offlineTableName));
- Assert.assertTrue(tableNames.contains(realtimeTableName));
- Assert.assertEquals(_helixResourceManager.getAllRawTables(),
Collections.singletonList(rawTableName));
- Assert.assertEquals(_helixResourceManager.getAllRealtimeTables(),
Collections.singletonList(realtimeTableName));
-
- // Tenant APIs
- Assert.assertEquals(_helixResourceManager.getAllBrokerTenantNames(),
Collections.singleton("TestTenant"));
- Assert.assertEquals(_helixResourceManager.getAllServerTenantNames(),
Collections.singleton("TestTenant"));
- }
-
- @AfterClass
- public void tearDown()
- throws Exception {
- stopMinion();
-
- super.tearDown();
- }
-}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
deleted file mode 100644
index 21ea6467c7..0000000000
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.converttorawindex;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.Map;
-import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.core.minion.RawIndexConverter;
-import
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
-import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
-import org.apache.pinot.segment.spi.index.IndexingOverrides;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-
-
-public class ConvertToRawIndexTaskExecutor extends
BaseSingleSegmentConversionExecutor {
-
- @Override
- protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig,
File indexDir, File workingDir)
- throws Exception {
- Map<String, String> configs = pinotTaskConfig.getConfigs();
- String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- _eventObserver.notifyProgress(pinotTaskConfig, "Converting segment: " +
indexDir);
- new RawIndexConverter(rawTableName, indexDir, workingDir,
-
configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY),
- IndexingOverrides.getIndexCreatorProvider()).convert();
- return new SegmentConversionResult.Builder().setFile(workingDir)
- .setTableNameWithType(configs.get(MinionConstants.TABLE_NAME_KEY))
- .setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)).build();
- }
-
- @Override
- protected SegmentZKMetadataCustomMapModifier
getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
- SegmentConversionResult segmentConversionResult) {
- return new
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
Collections
- .singletonMap(MinionConstants.ConvertToRawIndexTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX,
- String.valueOf(System.currentTimeMillis())));
- }
-}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
deleted file mode 100644
index 0b208c483a..0000000000
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.converttorawindex;
-
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.minion.MinionConf;
-import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
-import org.apache.pinot.minion.executor.PinotTaskExecutor;
-import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
-import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
-
-
-@TaskExecutorFactory
-public class ConvertToRawIndexTaskExecutorFactory implements
PinotTaskExecutorFactory {
-
- @Override
- public void init(MinionTaskZkMetadataManager zkMetadataManager) {
- }
-
- @Override
- public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf
minionConf) {
- }
-
- @Override
- public String getTaskType() {
- return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
- }
-
- @Override
- public PinotTaskExecutor create() {
- return new ConvertToRawIndexTaskExecutor();
- }
-}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
deleted file mode 100644
index 0d0d7699f7..0000000000
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.converttorawindex;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.pinot.common.data.Segment;
-import org.apache.pinot.common.lineage.SegmentLineage;
-import org.apache.pinot.common.lineage.SegmentLineageUtils;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
-import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.spi.annotations.minion.TaskGenerator;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-@TaskGenerator
-public class ConvertToRawIndexTaskGenerator extends BaseTaskGenerator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class);
-
- @Override
- public String getTaskType() {
- return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
- }
-
- @Override
- public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
- List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
-
- // Get the segments that are being converted so that we don't submit them
again
- Set<Segment> runningSegments =
-
TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
_clusterInfoAccessor);
-
- for (TableConfig tableConfig : tableConfigs) {
- // Only generate tasks for OFFLINE tables
- String offlineTableName = tableConfig.getTableName();
- if (tableConfig.getTableType() != TableType.OFFLINE) {
- LOGGER.warn("Skip generating ConvertToRawIndexTask for non-OFFLINE
table: {}", offlineTableName);
- continue;
- }
-
- TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
- Preconditions.checkNotNull(tableTaskConfig);
- Map<String, String> taskConfigs =
-
tableTaskConfig.getConfigsForTaskType(MinionConstants.ConvertToRawIndexTask.TASK_TYPE);
- Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null
for Table: {}", offlineTableName);
-
- // Get max number of tasks for this table
- int tableMaxNumTasks;
- String tableMaxNumTasksConfig =
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
- if (tableMaxNumTasksConfig != null) {
- try {
- tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
- } catch (Exception e) {
- tableMaxNumTasks = Integer.MAX_VALUE;
- }
- } else {
- tableMaxNumTasks = Integer.MAX_VALUE;
- }
-
- // Get the config for columns to convert
- String columnsToConvertConfig =
taskConfigs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY);
-
- // Generate tasks
- List<SegmentZKMetadata> offlineSegmentsZKMetadata =
_clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
- SegmentLineage segmentLineage =
_clusterInfoAccessor.getSegmentLineage(offlineTableName);
- Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
- for (SegmentZKMetadata offlineSegmentZKMetadata :
offlineSegmentsZKMetadata) {
-
preSelectedSegmentsBasedOnLineage.add(offlineSegmentZKMetadata.getSegmentName());
- }
-
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage,
segmentLineage);
-
- int tableNumTasks = 0;
- for (SegmentZKMetadata segmentZKMetadata : offlineSegmentsZKMetadata) {
- // Generate up to tableMaxNumTasks tasks each time for each table
- if (tableNumTasks == tableMaxNumTasks) {
- break;
- }
-
- // Skip segments that are already submitted
- String segmentName = segmentZKMetadata.getSegmentName();
- if (runningSegments.contains(new Segment(offlineTableName,
segmentName))) {
- continue;
- }
-
- // Skip segments based on lineage: for COMPLETED lineage, segments in
`segmentsFrom` will be removed by
- // retention manager, for IN_PROGRESS lineage, segments in
`segmentsTo` are uploaded yet
- if (!preSelectedSegmentsBasedOnLineage.contains(segmentName)) {
- continue;
- }
-
- // Only submit segments that have not been converted
- Map<String, String> customMap = segmentZKMetadata.getCustomMap();
- if (customMap == null || !customMap.containsKey(
- MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY +
MinionConstants.TASK_TIME_SUFFIX)) {
- Map<String, String> configs = new HashMap<>();
- configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
- configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
- configs.put(MinionConstants.DOWNLOAD_URL_KEY,
segmentZKMetadata.getDownloadUrl());
- configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrl() + "/segments");
- configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY,
String.valueOf(segmentZKMetadata.getCrc()));
- if (columnsToConvertConfig != null) {
-
configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY,
columnsToConvertConfig);
- }
- pinotTaskConfigs.add(new
PinotTaskConfig(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, configs));
- tableNumTasks++;
- }
- }
- }
-
- return pinotTaskConfigs;
- }
-}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRowIndexTaskProgressObserverFactory.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRowIndexTaskProgressObserverFactory.java
deleted file mode 100644
index 53d456d9f6..0000000000
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRowIndexTaskProgressObserverFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.converttorawindex;
-
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory;
-import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
-
-
-@EventObserverFactory
-public class ConvertToRowIndexTaskProgressObserverFactory extends
BaseMinionProgressObserverFactory {
-
- @Override
- public String getTaskType() {
- return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
- }
-}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java
index 667625af33..6abbf1eea6 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java
@@ -21,38 +21,36 @@ package org.apache.pinot.plugin.minion.tasks;
import java.util.Set;
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
-import
org.apache.pinot.plugin.minion.tasks.converttorawindex.ConvertToRawIndexTaskExecutorFactory;
-import
org.apache.pinot.plugin.minion.tasks.converttorawindex.ConvertToRawIndexTaskGenerator;
import
org.apache.pinot.plugin.minion.tasks.mergerollup.MergeRollupTaskExecutorFactory;
import
org.apache.pinot.plugin.minion.tasks.mergerollup.MergeRollupTaskGenerator;
import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskExecutorFactory;
+import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskGenerator;
import
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutorFactory;
import
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator;
import
org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush.SegmentGenerationAndPushTaskExecutorFactory;
import
org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush.SegmentGenerationAndPushTaskGenerator;
-import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertTrue;
+
public class TaskRegistryTest {
+
@Test
public void testTaskGeneratorRegistry() {
Set<Class<?>> classes = TaskGeneratorRegistry.getTaskGeneratorClasses();
- Assert.assertTrue(classes.size() >= 4);
- Assert.assertTrue(classes.contains(ConvertToRawIndexTaskGenerator.class));
- Assert.assertTrue(classes.contains(MergeRollupTaskGenerator.class));
-
Assert.assertTrue(classes.contains(SegmentGenerationAndPushTaskGenerator.class));
-
Assert.assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskGenerator.class));
+ assertTrue(classes.contains(MergeRollupTaskGenerator.class));
+ assertTrue(classes.contains(PurgeTaskGenerator.class));
+ assertTrue(classes.contains(SegmentGenerationAndPushTaskGenerator.class));
+ assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskGenerator.class));
}
@Test
public void testTaskExecutorRegistry() {
Set<Class<?>> classes =
TaskExecutorFactoryRegistry.getTaskExecutorFactoryClasses();
- Assert.assertTrue(classes.size() >= 5);
-
Assert.assertTrue(classes.contains(ConvertToRawIndexTaskExecutorFactory.class));
- Assert.assertTrue(classes.contains(MergeRollupTaskExecutorFactory.class));
- Assert.assertTrue(classes.contains(PurgeTaskExecutorFactory.class));
-
Assert.assertTrue(classes.contains(SegmentGenerationAndPushTaskExecutorFactory.class));
-
Assert.assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskExecutorFactory.class));
+ assertTrue(classes.contains(MergeRollupTaskExecutorFactory.class));
+ assertTrue(classes.contains(PurgeTaskExecutorFactory.class));
+
assertTrue(classes.contains(SegmentGenerationAndPushTaskExecutorFactory.class));
+
assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskExecutorFactory.class));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]