This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 782c3c2df5 Remove ConvertToRawIndexTask (#10068)
782c3c2df5 is described below

commit 782c3c2df59d2b173ba9ef595aeabd27cb00a332
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Jan 6 08:32:46 2023 -0800

    Remove ConvertToRawIndexTask (#10068)
---
 .../apache/pinot/core/common/MinionConstants.java  |   5 -
 .../pinot/core/minion/RawIndexConverter.java       | 263 ---------------------
 ...vertToRawIndexMinionClusterIntegrationTest.java | 211 -----------------
 .../ConvertToRawIndexTaskExecutor.java             |  58 -----
 .../ConvertToRawIndexTaskExecutorFactory.java      |  49 ----
 .../ConvertToRawIndexTaskGenerator.java            | 140 -----------
 ...nvertToRowIndexTaskProgressObserverFactory.java |  33 ---
 .../plugin/minion/tasks/TaskRegistryTest.java      |  26 +-
 8 files changed, 12 insertions(+), 773 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index c4007ee561..86bfa4829d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -59,11 +59,6 @@ public class MinionConstants {
   public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
   public static final String ENABLE_REPLACE_SEGMENTS_KEY = 
"enableReplaceSegments";
 
-  public static class ConvertToRawIndexTask {
-    public static final String TASK_TYPE = "ConvertToRawIndexTask";
-    public static final String COLUMNS_TO_CONVERT_KEY = "columnsToConvert";
-  }
-
   // Purges rows inside segment that match chosen criteria
   public static class PurgeTask {
     public static final String TASK_TYPE = "PurgeTask";
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
deleted file mode 100644
index 49a786c340..0000000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.minion;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.utils.CrcUtils;
-import org.apache.pinot.segment.spi.ColumnMetadata;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
-import org.apache.pinot.segment.spi.creator.ForwardIndexCreatorProvider;
-import org.apache.pinot.segment.spi.creator.IndexCreationContext;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.MetricFieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>RawIndexConverter</code> class takes a segment and converts the 
dictionary-based indexes inside the segment
- * into raw indexes.
- * <ul>
- *   <li>
- *     If columns to convert are specified, check whether their 
dictionary-based indexes exist and convert them.
- *   </li>
- *   <li>
- *     If not specified, for each metric column, calculate the size of 
dictionary-based index and uncompressed raw
- *     index. If the size of raw index is smaller or equal to (size of 
dictionary-based index * CONVERSION_THRESHOLD),
- *     convert it.
- *   </li>
- * </ul>
- * <p>After the conversion, add "rawIndex" into the segment metadata 
"optimizations" field.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class RawIndexConverter {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(RawIndexConverter.class);
-
-  // Threshold for the ratio of uncompressed raw index size and 
dictionary-based index size to trigger conversion
-  private static final int CONVERSION_THRESHOLD = 4;
-
-  // BITS_PER_ELEMENT is not applicable for raw index
-  private static final int BITS_PER_ELEMENT_FOR_RAW_INDEX = -1;
-
-  private final String _rawTableName;
-  private final ImmutableSegment _originalImmutableSegment;
-  private final SegmentMetadata _originalSegmentMetadata;
-  private final File _convertedIndexDir;
-  private final PropertiesConfiguration _convertedProperties;
-  private final String _columnsToConvert;
-  private final ForwardIndexCreatorProvider _indexCreatorProvider;
-
-  /**
-   * NOTE: original segment should be in V1 format.
-   * TODO: support V3 format
-   */
-  public RawIndexConverter(String rawTableName, File originalIndexDir, File 
convertedIndexDir,
-      @Nullable String columnsToConvert, ForwardIndexCreatorProvider 
indexCreatorProvider)
-      throws Exception {
-    FileUtils.copyDirectory(originalIndexDir, convertedIndexDir);
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    _rawTableName = rawTableName;
-    _originalImmutableSegment = ImmutableSegmentLoader.load(originalIndexDir, 
indexLoadingConfig, null, false);
-    _originalSegmentMetadata = _originalImmutableSegment.getSegmentMetadata();
-    _convertedIndexDir = convertedIndexDir;
-    _convertedProperties =
-        new PropertiesConfiguration(new File(_convertedIndexDir, 
V1Constants.MetadataKeys.METADATA_FILE_NAME));
-    _columnsToConvert = columnsToConvert;
-    _indexCreatorProvider = indexCreatorProvider;
-  }
-
-  public boolean convert()
-      throws Exception {
-    String segmentName = _originalSegmentMetadata.getName();
-    LOGGER.info("Start converting segment: {} in table: {}", segmentName, 
_rawTableName);
-
-    List<FieldSpec> columnsToConvert = new ArrayList<>();
-    Schema schema = _originalSegmentMetadata.getSchema();
-    if (_columnsToConvert == null) {
-      LOGGER.info("Columns to convert are not specified, check each metric 
column");
-      for (MetricFieldSpec metricFieldSpec : schema.getMetricFieldSpecs()) {
-        if 
(_originalSegmentMetadata.getColumnMetadataFor(metricFieldSpec.getName()).hasDictionary()
-            && shouldConvertColumn(metricFieldSpec)) {
-          columnsToConvert.add(metricFieldSpec);
-        }
-      }
-    } else {
-      LOGGER.info("Columns to convert: {}", _columnsToConvert);
-      for (String columnToConvert : StringUtils.split(_columnsToConvert, ',')) 
{
-        FieldSpec fieldSpec = schema.getFieldSpecFor(columnToConvert);
-        if (fieldSpec == null) {
-          LOGGER.warn("Skip converting column: {} because is does not exist in 
the schema", columnsToConvert);
-          continue;
-        }
-        if (!fieldSpec.isSingleValueField()) {
-          LOGGER.warn("Skip converting column: {} because it's a multi-value 
column", columnsToConvert);
-          continue;
-        }
-        if 
(!_originalSegmentMetadata.getColumnMetadataFor(columnToConvert).hasDictionary())
 {
-          LOGGER.warn("Skip converting column: {} because its index is not 
dictionary-based", columnsToConvert);
-          continue;
-        }
-        columnsToConvert.add(fieldSpec);
-      }
-    }
-
-    if (columnsToConvert.isEmpty()) {
-      LOGGER.info("No column converted for segment: {} in table: {}", 
segmentName, _rawTableName);
-      return false;
-    } else {
-      // Convert columns
-      for (FieldSpec columnToConvert : columnsToConvert) {
-        convertColumn(columnToConvert);
-      }
-      _convertedProperties.save();
-
-      // Update creation metadata with new computed CRC and original segment 
creation time
-      SegmentIndexCreationDriverImpl
-          .persistCreationMeta(_convertedIndexDir, 
CrcUtils.forAllFilesInFolder(_convertedIndexDir).computeCrc(),
-              _originalSegmentMetadata.getIndexCreationTime());
-
-      LOGGER.info("{} columns converted for segment: {} in table: {}", 
columnsToConvert.size(), segmentName,
-          _rawTableName);
-      return true;
-    }
-  }
-
-  private boolean shouldConvertColumn(FieldSpec fieldSpec) {
-    String columnName = fieldSpec.getName();
-    DataType storedType = fieldSpec.getDataType().getStoredType();
-    int numTotalDocs = _originalSegmentMetadata.getTotalDocs();
-    ColumnMetadata columnMetadata = 
_originalSegmentMetadata.getColumnMetadataFor(columnName);
-
-    int cardinality = columnMetadata.getCardinality();
-
-    // In bits
-    int lengthOfEachEntry;
-    if (storedType.isFixedWidth()) {
-      lengthOfEachEntry = storedType.size() * Byte.SIZE;
-    } else {
-      lengthOfEachEntry = columnMetadata.getColumnMaxLength() * Byte.SIZE;
-    }
-    long dictionaryBasedIndexSize =
-        (long) numTotalDocs * columnMetadata.getBitsPerElement() + (long) 
cardinality * lengthOfEachEntry;
-    long rawIndexSize = (long) numTotalDocs * lengthOfEachEntry;
-    LOGGER.info(
-        "For column: {}, size of dictionary based index: {} bits, size of raw 
index (without compression): {} bits",
-        columnName, dictionaryBasedIndexSize, rawIndexSize);
-
-    return rawIndexSize <= dictionaryBasedIndexSize * CONVERSION_THRESHOLD;
-  }
-
-  private void convertColumn(FieldSpec fieldSpec)
-      throws Exception {
-    String columnName = fieldSpec.getName();
-    LOGGER.info("Converting column: {}", columnName);
-
-    // Delete dictionary and existing indexes
-    FileUtils.deleteQuietly(new File(_convertedIndexDir, columnName + 
V1Constants.Dict.FILE_EXTENSION));
-    FileUtils.deleteQuietly(
-        new File(_convertedIndexDir, columnName + 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION));
-    FileUtils.deleteQuietly(
-        new File(_convertedIndexDir, columnName + 
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION));
-    FileUtils.deleteQuietly(
-        new File(_convertedIndexDir, columnName + 
V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION));
-
-    // Create the raw index
-    DataSource dataSource = 
_originalImmutableSegment.getDataSource(columnName);
-    ForwardIndexReader forwardIndexReader = dataSource.getForwardIndex();
-    Preconditions.checkState(forwardIndexReader != null,
-        "Forward index disabled for column: %s, raw index conversion operation 
unsupported!", columnName);
-    Dictionary dictionary = dataSource.getDictionary();
-    assert dictionary != null;
-    DataType storedType = dictionary.getValueType();
-    int numDocs = _originalSegmentMetadata.getTotalDocs();
-    ColumnMetadata columnMetadata = 
_originalSegmentMetadata.getColumnMetadataFor(columnName);
-    try (ForwardIndexCreator rawIndexCreator = 
_indexCreatorProvider.newForwardIndexCreator(
-        
IndexCreationContext.builder().withIndexDir(_convertedIndexDir).withColumnMetadata(columnMetadata)
-            .withFieldSpec(new DimensionFieldSpec(columnName, storedType, 
columnMetadata.isSingleValue()))
-            
.withDictionary(false).build().forForwardIndex(ChunkCompressionType.LZ4, null));
-        ForwardIndexReaderContext readerContext = 
forwardIndexReader.createContext()) {
-      switch (storedType) {
-        case INT:
-          for (int docId = 0; docId < numDocs; docId++) {
-            
rawIndexCreator.putInt(dictionary.getIntValue(forwardIndexReader.getDictId(docId,
 readerContext)));
-          }
-          break;
-        case LONG:
-          for (int docId = 0; docId < numDocs; docId++) {
-            
rawIndexCreator.putLong(dictionary.getLongValue(forwardIndexReader.getDictId(docId,
 readerContext)));
-          }
-          break;
-        case FLOAT:
-          for (int docId = 0; docId < numDocs; docId++) {
-            
rawIndexCreator.putFloat(dictionary.getFloatValue(forwardIndexReader.getDictId(docId,
 readerContext)));
-          }
-          break;
-        case DOUBLE:
-          for (int docId = 0; docId < numDocs; docId++) {
-            
rawIndexCreator.putDouble(dictionary.getDoubleValue(forwardIndexReader.getDictId(docId,
 readerContext)));
-          }
-          break;
-        case STRING:
-          for (int docId = 0; docId < numDocs; docId++) {
-            
rawIndexCreator.putString(dictionary.getStringValue(forwardIndexReader.getDictId(docId,
 readerContext)));
-          }
-          break;
-        case BYTES:
-          for (int docId = 0; docId < numDocs; docId++) {
-            
rawIndexCreator.putBytes(dictionary.getBytesValue(forwardIndexReader.getDictId(docId,
 readerContext)));
-          }
-          break;
-        default:
-          throw new IllegalStateException();
-      }
-    }
-
-    // Update the segment metadata
-    _convertedProperties.setProperty(
-        V1Constants.MetadataKeys.Column.getKeyFor(columnName, 
V1Constants.MetadataKeys.Column.HAS_DICTIONARY), false);
-    _convertedProperties.setProperty(
-        V1Constants.MetadataKeys.Column.getKeyFor(columnName, 
V1Constants.MetadataKeys.Column.BITS_PER_ELEMENT),
-        BITS_PER_ELEMENT_FOR_RAW_INDEX);
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
deleted file mode 100644
index b1df10ae0b..0000000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.commons.lang.StringUtils;
-import org.apache.helix.task.TaskState;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
-import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.common.MinionConstants.ConvertToRawIndexTask;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-/**
- * Integration test that extends HybridClusterIntegrationTest and add Minions 
into the cluster to convert 3 metric
- * columns' index into raw index for OFFLINE segments.
- */
-public class ConvertToRawIndexMinionClusterIntegrationTest extends 
HybridClusterIntegrationTest {
-  private static final String COLUMNS_TO_CONVERT = 
"ActualElapsedTime,ArrDelay,DepDelay,CRSDepTime";
-
-  private PinotHelixTaskResourceManager _helixTaskResourceManager;
-  private PinotTaskManager _taskManager;
-
-  @Nullable
-  @Override
-  protected List<String> getNoDictionaryColumns() {
-    return null;
-  }
-
-  // NOTE: Only allow converting raw index for v1 segment
-  @Override
-  protected String getSegmentVersion() {
-    return SegmentVersion.v1.name();
-  }
-
-  @Override
-  protected TableTaskConfig getTaskConfig() {
-    Map<String, String> convertToRawIndexTaskConfigs = new HashMap<>();
-    convertToRawIndexTaskConfigs.put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY, 
"5");
-    
convertToRawIndexTaskConfigs.put(ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, 
COLUMNS_TO_CONVERT);
-    return new 
TableTaskConfig(Collections.singletonMap(ConvertToRawIndexTask.TASK_TYPE, 
convertToRawIndexTaskConfigs));
-  }
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    // The parent setUp() sets up Zookeeper, Kafka, controller, broker and 
servers
-    super.setUp();
-
-    startMinion();
-    _helixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
-    _taskManager = _controllerStarter.getTaskManager();
-  }
-
-  @Test
-  public void testConvertToRawIndexTask()
-      throws Exception {
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
-
-    File testDataDir = new 
File(CommonConstants.Server.DEFAULT_INSTANCE_DATA_DIR + "-0", offlineTableName);
-    if (!testDataDir.isDirectory()) {
-      testDataDir = new File(CommonConstants.Server.DEFAULT_INSTANCE_DATA_DIR 
+ "-1", offlineTableName);
-    }
-    Assert.assertTrue(testDataDir.isDirectory());
-    File tableDataDir = testDataDir;
-
-    // Check that all columns have dictionary
-    // Skip the tmp directory since these are only temporary segments
-    FilenameFilter nonTmpFileFilter = new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-        return !name.equals("tmp");
-      }
-    };
-    File[] indexDirs = tableDataDir.listFiles(nonTmpFileFilter);
-    Assert.assertNotNull(indexDirs);
-    for (File indexDir : indexDirs) {
-      SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
-      for (String columnName : segmentMetadata.getSchema().getColumnNames()) {
-        
Assert.assertTrue(segmentMetadata.getColumnMetadataFor(columnName).hasDictionary());
-      }
-    }
-
-    // Should create the task queues and generate a ConvertToRawIndexTask task 
with 5 child tasks
-    
Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
-    Assert.assertTrue(_helixTaskResourceManager.getTaskQueues()
-        
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(ConvertToRawIndexTask.TASK_TYPE)));
-
-    // Should generate one more ConvertToRawIndexTask task with 3 child tasks
-    
Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
-
-    // Should not generate more tasks
-    
Assert.assertNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
-
-    // Wait at most 600 seconds for all tasks COMPLETED and new segments 
refreshed
-    TestUtils.waitForCondition(input -> {
-      // Check task state
-      for (TaskState taskState : 
_helixTaskResourceManager.getTaskStates(ConvertToRawIndexTask.TASK_TYPE).values())
 {
-        if (taskState != TaskState.COMPLETED) {
-          return false;
-        }
-      }
-
-      // Check segment ZK metadata
-      for (SegmentZKMetadata segmentZKMetadata : 
_helixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
-        Map<String, String> customMap = segmentZKMetadata.getCustomMap();
-        if (customMap == null || customMap.size() != 1 || !customMap
-            .containsKey(ConvertToRawIndexTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX)) {
-          return false;
-        }
-      }
-
-      // Check segment metadata
-      File[] indexDirs1 = tableDataDir.listFiles(nonTmpFileFilter);
-      Assert.assertNotNull(indexDirs1);
-      for (File indexDir : indexDirs1) {
-        SegmentMetadata segmentMetadata;
-
-        // Segment metadata file might not exist if the segment is refreshing
-        try {
-          segmentMetadata = new SegmentMetadataImpl(indexDir);
-        } catch (Exception e) {
-          return false;
-        }
-
-        // The columns in COLUMNS_TO_CONVERT should have raw index
-        List<String> rawIndexColumns = 
Arrays.asList(StringUtils.split(COLUMNS_TO_CONVERT, ','));
-        for (String columnName : segmentMetadata.getSchema().getColumnNames()) 
{
-          if (rawIndexColumns.contains(columnName)) {
-            if 
(segmentMetadata.getColumnMetadataFor(columnName).hasDictionary()) {
-              return false;
-            }
-          } else {
-            if 
(!segmentMetadata.getColumnMetadataFor(columnName).hasDictionary()) {
-              return false;
-            }
-          }
-        }
-      }
-
-      return true;
-    }, 600_000L, "Failed to get all tasks COMPLETED and new segments 
refreshed");
-  }
-
-  @Test
-  public void testPinotHelixResourceManagerAPIs() {
-    // Instance APIs
-    Assert.assertEquals(_helixResourceManager.getAllInstances().size(), 5);
-    Assert.assertEquals(_helixResourceManager.getOnlineInstanceList().size(), 
5);
-    
Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(),
 0);
-    
Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(),
 0);
-
-    // Table APIs
-    String rawTableName = getTableName();
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
-    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
-    List<String> tableNames = _helixResourceManager.getAllTables();
-    Assert.assertEquals(tableNames.size(), 2);
-    Assert.assertTrue(tableNames.contains(offlineTableName));
-    Assert.assertTrue(tableNames.contains(realtimeTableName));
-    Assert.assertEquals(_helixResourceManager.getAllRawTables(), 
Collections.singletonList(rawTableName));
-    Assert.assertEquals(_helixResourceManager.getAllRealtimeTables(), 
Collections.singletonList(realtimeTableName));
-
-    // Tenant APIs
-    Assert.assertEquals(_helixResourceManager.getAllBrokerTenantNames(), 
Collections.singleton("TestTenant"));
-    Assert.assertEquals(_helixResourceManager.getAllServerTenantNames(), 
Collections.singleton("TestTenant"));
-  }
-
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    stopMinion();
-
-    super.tearDown();
-  }
-}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
deleted file mode 100644
index 21ea6467c7..0000000000
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.converttorawindex;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.Map;
-import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.core.minion.RawIndexConverter;
-import 
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
-import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
-import org.apache.pinot.segment.spi.index.IndexingOverrides;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-
-
-public class ConvertToRawIndexTaskExecutor extends 
BaseSingleSegmentConversionExecutor {
-
-  @Override
-  protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, 
File indexDir, File workingDir)
-      throws Exception {
-    Map<String, String> configs = pinotTaskConfig.getConfigs();
-    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-    _eventObserver.notifyProgress(pinotTaskConfig, "Converting segment: " + 
indexDir);
-    new RawIndexConverter(rawTableName, indexDir, workingDir,
-        
configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY),
-        IndexingOverrides.getIndexCreatorProvider()).convert();
-    return new SegmentConversionResult.Builder().setFile(workingDir)
-        .setTableNameWithType(configs.get(MinionConstants.TABLE_NAME_KEY))
-        .setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)).build();
-  }
-
-  @Override
-  protected SegmentZKMetadataCustomMapModifier 
getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
-      SegmentConversionResult segmentConversionResult) {
-    return new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
 Collections
-        .singletonMap(MinionConstants.ConvertToRawIndexTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX,
-            String.valueOf(System.currentTimeMillis())));
-  }
-}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
deleted file mode 100644
index 0b208c483a..0000000000
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.converttorawindex;
-
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.minion.MinionConf;
-import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
-import org.apache.pinot.minion.executor.PinotTaskExecutor;
-import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
-import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
-
-
-@TaskExecutorFactory
-public class ConvertToRawIndexTaskExecutorFactory implements 
PinotTaskExecutorFactory {
-
-  @Override
-  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
-  }
-
-  @Override
-  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf 
minionConf) {
-  }
-
-  @Override
-  public String getTaskType() {
-    return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
-  }
-
-  @Override
-  public PinotTaskExecutor create() {
-    return new ConvertToRawIndexTaskExecutor();
-  }
-}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
deleted file mode 100644
index 0d0d7699f7..0000000000
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.converttorawindex;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.pinot.common.data.Segment;
-import org.apache.pinot.common.lineage.SegmentLineage;
-import org.apache.pinot.common.lineage.SegmentLineageUtils;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
-import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.spi.annotations.minion.TaskGenerator;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-@TaskGenerator
-public class ConvertToRawIndexTaskGenerator extends BaseTaskGenerator {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class);
-
-  @Override
-  public String getTaskType() {
-    return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
-  }
-
-  @Override
-  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
-    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
-
-    // Get the segments that are being converted so that we don't submit them 
again
-    Set<Segment> runningSegments =
-        
TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
 _clusterInfoAccessor);
-
-    for (TableConfig tableConfig : tableConfigs) {
-      // Only generate tasks for OFFLINE tables
-      String offlineTableName = tableConfig.getTableName();
-      if (tableConfig.getTableType() != TableType.OFFLINE) {
-        LOGGER.warn("Skip generating ConvertToRawIndexTask for non-OFFLINE 
table: {}", offlineTableName);
-        continue;
-      }
-
-      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
-      Preconditions.checkNotNull(tableTaskConfig);
-      Map<String, String> taskConfigs =
-          
tableTaskConfig.getConfigsForTaskType(MinionConstants.ConvertToRawIndexTask.TASK_TYPE);
-      Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null 
for Table: {}", offlineTableName);
-
-      // Get max number of tasks for this table
-      int tableMaxNumTasks;
-      String tableMaxNumTasksConfig = 
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
-      if (tableMaxNumTasksConfig != null) {
-        try {
-          tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
-        } catch (Exception e) {
-          tableMaxNumTasks = Integer.MAX_VALUE;
-        }
-      } else {
-        tableMaxNumTasks = Integer.MAX_VALUE;
-      }
-
-      // Get the config for columns to convert
-      String columnsToConvertConfig = 
taskConfigs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY);
-
-      // Generate tasks
-      List<SegmentZKMetadata> offlineSegmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
-      SegmentLineage segmentLineage = 
_clusterInfoAccessor.getSegmentLineage(offlineTableName);
-      Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
-      for (SegmentZKMetadata offlineSegmentZKMetadata : 
offlineSegmentsZKMetadata) {
-        
preSelectedSegmentsBasedOnLineage.add(offlineSegmentZKMetadata.getSegmentName());
-      }
-      
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage,
 segmentLineage);
-
-      int tableNumTasks = 0;
-      for (SegmentZKMetadata segmentZKMetadata : offlineSegmentsZKMetadata) {
-        // Generate up to tableMaxNumTasks tasks each time for each table
-        if (tableNumTasks == tableMaxNumTasks) {
-          break;
-        }
-
-        // Skip segments that are already submitted
-        String segmentName = segmentZKMetadata.getSegmentName();
-        if (runningSegments.contains(new Segment(offlineTableName, 
segmentName))) {
-          continue;
-        }
-
-        // Skip segments based on lineage: for COMPLETED lineage, segments in 
`segmentsFrom` will be removed by
-        // retention manager, for IN_PROGRESS lineage, segments in 
`segmentsTo` are uploaded yet
-        if (!preSelectedSegmentsBasedOnLineage.contains(segmentName)) {
-          continue;
-        }
-
-        // Only submit segments that have not been converted
-        Map<String, String> customMap = segmentZKMetadata.getCustomMap();
-        if (customMap == null || !customMap.containsKey(
-            MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY + 
MinionConstants.TASK_TIME_SUFFIX)) {
-          Map<String, String> configs = new HashMap<>();
-          configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
-          configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
-          configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
segmentZKMetadata.getDownloadUrl());
-          configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
-          configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, 
String.valueOf(segmentZKMetadata.getCrc()));
-          if (columnsToConvertConfig != null) {
-            
configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, 
columnsToConvertConfig);
-          }
-          pinotTaskConfigs.add(new 
PinotTaskConfig(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, configs));
-          tableNumTasks++;
-        }
-      }
-    }
-
-    return pinotTaskConfigs;
-  }
-}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRowIndexTaskProgressObserverFactory.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRowIndexTaskProgressObserverFactory.java
deleted file mode 100644
index 53d456d9f6..0000000000
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRowIndexTaskProgressObserverFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.converttorawindex;
-
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory;
-import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
-
-
-@EventObserverFactory
-public class ConvertToRowIndexTaskProgressObserverFactory extends 
BaseMinionProgressObserverFactory {
-
-  @Override
-  public String getTaskType() {
-    return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
-  }
-}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java
index 667625af33..6abbf1eea6 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/TaskRegistryTest.java
@@ -21,38 +21,36 @@ package org.apache.pinot.plugin.minion.tasks;
 import java.util.Set;
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
 import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
-import 
org.apache.pinot.plugin.minion.tasks.converttorawindex.ConvertToRawIndexTaskExecutorFactory;
-import 
org.apache.pinot.plugin.minion.tasks.converttorawindex.ConvertToRawIndexTaskGenerator;
 import 
org.apache.pinot.plugin.minion.tasks.mergerollup.MergeRollupTaskExecutorFactory;
 import 
org.apache.pinot.plugin.minion.tasks.mergerollup.MergeRollupTaskGenerator;
 import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskExecutorFactory;
+import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskGenerator;
 import 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutorFactory;
 import 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator;
 import 
org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush.SegmentGenerationAndPushTaskExecutorFactory;
 import 
org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush.SegmentGenerationAndPushTaskGenerator;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertTrue;
+
 
 public class TaskRegistryTest {
+
   @Test
   public void testTaskGeneratorRegistry() {
     Set<Class<?>> classes = TaskGeneratorRegistry.getTaskGeneratorClasses();
-    Assert.assertTrue(classes.size() >= 4);
-    Assert.assertTrue(classes.contains(ConvertToRawIndexTaskGenerator.class));
-    Assert.assertTrue(classes.contains(MergeRollupTaskGenerator.class));
-    
Assert.assertTrue(classes.contains(SegmentGenerationAndPushTaskGenerator.class));
-    
Assert.assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskGenerator.class));
+    assertTrue(classes.contains(MergeRollupTaskGenerator.class));
+    assertTrue(classes.contains(PurgeTaskGenerator.class));
+    assertTrue(classes.contains(SegmentGenerationAndPushTaskGenerator.class));
+    assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskGenerator.class));
   }
 
   @Test
   public void testTaskExecutorRegistry() {
     Set<Class<?>> classes = 
TaskExecutorFactoryRegistry.getTaskExecutorFactoryClasses();
-    Assert.assertTrue(classes.size() >= 5);
-    
Assert.assertTrue(classes.contains(ConvertToRawIndexTaskExecutorFactory.class));
-    Assert.assertTrue(classes.contains(MergeRollupTaskExecutorFactory.class));
-    Assert.assertTrue(classes.contains(PurgeTaskExecutorFactory.class));
-    
Assert.assertTrue(classes.contains(SegmentGenerationAndPushTaskExecutorFactory.class));
-    
Assert.assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskExecutorFactory.class));
+    assertTrue(classes.contains(MergeRollupTaskExecutorFactory.class));
+    assertTrue(classes.contains(PurgeTaskExecutorFactory.class));
+    
assertTrue(classes.contains(SegmentGenerationAndPushTaskExecutorFactory.class));
+    
assertTrue(classes.contains(RealtimeToOfflineSegmentsTaskExecutorFactory.class));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to