This is an automated email from the ASF dual-hosted git repository.

vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a9abd143f0 Minion Task to support automatic Segment Refresh (#14300)
a9abd143f0 is described below

commit a9abd143f0e0c8dee991239931d59f4db186093d
Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com>
AuthorDate: Thu Nov 21 01:04:52 2024 +0530

    Minion Task to support automatic Segment Refresh (#14300)
    
    * Minion Task to support automatic Segment Refresh
    
    * Address review comments
    
    * Address review comments.
---
 .../api/resources/PinotTableRestletResource.java   |   6 +-
 .../helix/core/PinotHelixResourceManager.java      |  16 +-
 .../apache/pinot/core/common/MinionConstants.java  |  25 +-
 ...RefreshSegmentMinionClusterIntegrationTest.java | 463 +++++++++++++++++++++
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java |  18 +
 .../refreshsegment/RefreshSegmentTaskExecutor.java | 209 ++++++++++
 .../RefreshSegmentTaskExecutorFactory.java         |  49 +++
 .../RefreshSegmentTaskGenerator.java               | 171 ++++++++
 .../RefreshSegmentTaskProgressObserverFactory.java |  30 +-
 ...ableStats.java => TableStatsHumanReadable.java} |   6 +-
 10 files changed, 961 insertions(+), 32 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 040c57c885..2ece7deb0d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -114,7 +114,7 @@ import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableStats;
+import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
 import org.apache.pinot.spi.config.table.TableStatus;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
@@ -883,13 +883,13 @@ public class PinotTableRestletResource {
     if ((tableTypeStr == null || 
TableType.OFFLINE.name().equalsIgnoreCase(tableTypeStr))
         && _pinotHelixResourceManager.hasOfflineTable(tableName)) {
       String tableNameWithType = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
-      TableStats tableStats = 
_pinotHelixResourceManager.getTableStats(tableNameWithType);
+      TableStatsHumanReadable tableStats = 
_pinotHelixResourceManager.getTableStatsHumanReadable(tableNameWithType);
       ret.set(TableType.OFFLINE.name(), 
JsonUtils.objectToJsonNode(tableStats));
     }
     if ((tableTypeStr == null || 
TableType.REALTIME.name().equalsIgnoreCase(tableTypeStr))
         && _pinotHelixResourceManager.hasRealtimeTable(tableName)) {
       String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
-      TableStats tableStats = 
_pinotHelixResourceManager.getTableStats(tableNameWithType);
+      TableStatsHumanReadable tableStats = 
_pinotHelixResourceManager.getTableStatsHumanReadable(tableNameWithType);
       ret.set(TableType.REALTIME.name(), 
JsonUtils.objectToJsonNode(tableStats));
     }
     return ret.toString();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 099cf4b5e8..e7affa4287 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -160,7 +160,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.instance.Instance;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableStats;
+import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TenantConfig;
@@ -4240,12 +4240,22 @@ public class PinotHelixResourceManager {
     return onlineSegments;
   }
 
-  public TableStats getTableStats(String tableNameWithType) {
+  public TableStatsHumanReadable getTableStatsHumanReadable(String 
tableNameWithType) {
     String zkPath = 
ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType);
     Stat stat = _propertyStore.getStat(zkPath, AccessOption.PERSISTENT);
     Preconditions.checkState(stat != null, "Failed to read ZK stats for table: 
%s", tableNameWithType);
     String creationTime = 
SIMPLE_DATE_FORMAT.format(Instant.ofEpochMilli(stat.getCtime()));
-    return new TableStats(creationTime);
+    return new TableStatsHumanReadable(creationTime);
+  }
+
+  public Stat getTableStat(String tableNameWithType) {
+    String zkPath = 
ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType);
+    return _propertyStore.getStat(zkPath, AccessOption.PERSISTENT);
+  }
+
+  public Stat getSchemaStat(String schemaName) {
+    String zkPath = 
ZKMetadataProvider.constructPropertyStorePathForSchema(schemaName);
+    return _propertyStore.getStat(zkPath, AccessOption.PERSISTENT);
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 6ad497a4a3..26e0bd79ed 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -162,8 +162,29 @@ public class MinionConstants {
   // Generate segment and push to controller based on batch ingestion configs
   public static class SegmentGenerationAndPushTask {
     public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
-    public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
-        "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
+  }
+
+  /**
+   * Minion task to refresh segments when there are changes to tableConfigs 
and Schema. This task currently supports the
+   * following functionality:
+   * 1. Adding/Removing/Updating indexes.
+   * 2. Adding new columns (also supports transform configs for new columns).
+   * 3. Converting segment versions.
+   * 4. Compatible datatype changes to columns (Note that the minion task will 
fail if the data in the column is not
+   *    compatible with target datatype)
+   *
+   * This is an alternative to performing reload of existing segments on 
Servers. The reload on servers is sub-optimal
+   * for many reasons:
+   * 1. Requires an explicit reload call when index configurations change.
+   * 2. Is very slow. Happens one (or few - configurable) segment at time to 
avoid query impact.
+   * 3. Compute price is paid on all servers hosting the segment.q
+   * 4. Increases server startup time as more and more segments require reload.
+   */
+  public static class RefreshSegmentTask {
+    public static final String TASK_TYPE = "RefreshSegmentTask";
+
+    // Maximum number of tasks to create per table per run.
+    public static final int MAX_NUM_TASKS_PER_TABLE = 20;
   }
 
   public static class UpsertCompactionTask {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
new file mode 100644
index 0000000000..7f91a8671e
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class RefreshSegmentMinionClusterIntegrationTest extends 
BaseClusterIntegrationTest {
+  protected PinotHelixTaskResourceManager _helixTaskResourceManager;
+  protected PinotTaskManager _taskManager;
+  protected PinotHelixResourceManager _pinotHelixResourceManager;
+  protected final File _segmentDataDir = new File(_tempDir, "segmentDataDir");
+  protected final File _segmentTarDir = new File(_tempDir, "segmentTarDir");
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDataDir, 
_segmentTarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+    startMinion();
+
+    // Create schema and tableConfig
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    tableConfig.setTaskConfig(getRefreshSegmentTaskConfig());
+    addTableConfig(tableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+    // Create segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDataDir,
+        _segmentTarDir);
+    uploadSegments(getTableName(), _segmentTarDir);
+
+    _helixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
+    _taskManager = _controllerStarter.getTaskManager();
+    _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    stopMinion();
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Test(priority = 1)
+  public void testFirstSegmentRefresh() {
+    // This will create the inverted index as we disable inverted index 
creation during segment push.
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+        
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
+    // Will not schedule task if there's incomplete task
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    waitForTaskToComplete();
+
+    // Check that metadata contains expected values
+    Map<String, String> segmentRefreshTime = new HashMap<>();
+    String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX;
+    for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      // Get the value in segment metadata
+      Map<String, String> customMap = metadata.getCustomMap();
+      assertTrue(customMap.containsKey(refreshKey));
+      segmentRefreshTime.put(metadata.getSegmentName(), 
customMap.get(refreshKey));
+    }
+
+    // This should be no-op as nothing changes.
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      // Get the value in segment metadata
+      Map<String, String> customMap = metadata.getCustomMap();
+      assertTrue(
+          customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX));
+      assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), 
customMap.get(refreshKey),
+          "Refresh Time doesn't match");
+    }
+  }
+
+  @Test(priority = 2)
+  public void testValidDatatypeChange() throws Exception {
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+    // Change datatype from INT -> LONG for airlineId
+    deleteSchema(getTableName());
+    Schema schema = createSchema();
+    schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.LONG);
+    schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.STRING);
+    
schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.FLOAT);
+    
schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.STRING);
+    addSchema(schema);
+
+    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+        
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
+    // Will not schedule task if there's incomplete task
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    waitForTaskToComplete();
+
+    waitForServerSegmentDownload(aVoid -> {
+      try {
+        String query = "SELECT ArrTime FROM mytable LIMIT 10";
+        JsonNode response = postQuery(query);
+        return 
response.get("resultTable").get("dataSchema").get("columnDataTypes").toString().contains("LONG");
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    waitForServerSegmentDownload(aVoid -> {
+      try {
+        String query = "SELECT AirlineID FROM mytable LIMIT 10";
+        JsonNode response = postQuery(query);
+        return 
response.get("resultTable").get("dataSchema").get("columnDataTypes").toString().contains("STRING");
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    waitForServerSegmentDownload(aVoid -> {
+      try {
+        String query = "SELECT ActualElapsedTime FROM mytable LIMIT 10";
+        JsonNode response = postQuery(query);
+        return 
response.get("resultTable").get("dataSchema").get("columnDataTypes").toString().contains("FLOAT");
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    waitForServerSegmentDownload(aVoid -> {
+      try {
+        String query = "SELECT DestAirportID FROM mytable LIMIT 10";
+        JsonNode response = postQuery(query);
+        return 
response.get("resultTable").get("dataSchema").get("columnDataTypes").toString().contains("STRING");
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    // Reset the schema back to it's original state.
+    deleteSchema(getTableName());
+    schema = createSchema();
+    schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.INT);
+    schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.LONG);
+    
schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.INT);
+    
schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.INT);
+    addSchema(schema);
+  }
+
+  @Test(priority = 3)
+  public void testIndexChanges() throws Exception {
+    /**
+     * Adding bare-minimum tests for addition and removal of indexes. The 
segment generation code already
+     * has enough tests and testing each index addition/removal does not seem 
necessary.
+     */
+
+    // Current inverted index columns are "FlightNum", "Origin", "Quarter"
+    String query = "SELECT * FROM mytable WHERE flightNum = 3151 LIMIT 10";
+    assertEquals(postQuery(query).get("numEntriesScannedInFilter").asLong(), 
0L);
+    query = "SELECT * from mytable where Origin = 'SFO' LIMIT 10";
+    assertEquals(postQuery(query).get("numEntriesScannedInFilter").asLong(), 
0L);
+    query = "SELECT * from mytable where Quarter = 1 LIMIT 10";
+    assertEquals(postQuery(query).get("numEntriesScannedInFilter").asLong(), 
0L);
+
+    TableConfig tableConfig = getOfflineTableConfig();
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    // Add inverted index for DivActualElapsedTime
+    // Remove inverted index for "FlightNum"
+    
indexingConfig.setInvertedIndexColumns(Arrays.asList("DivActualElapsedTime", 
"Origin", "Quarter"));
+    updateTableConfig(tableConfig);
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+        
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
+    // Will not schedule task if there's incomplete task
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    waitForTaskToComplete();
+
+    waitForServerSegmentDownload(aVoid -> {
+      try {
+        String newQuery = "SELECT * FROM mytable where flightNum = 3151 LIMIT 
10";
+        return postQuery(newQuery).get("numEntriesScannedInFilter").asLong() > 
0;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    waitForServerSegmentDownload(aVoid -> {
+      try {
+        String newQuery = "SELECT * FROM mytable where DivActualElapsedTime = 
305 LIMIT 10";
+        return postQuery(newQuery).get("numEntriesScannedInFilter").asLong() 
== 0;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @Test(priority = 4)
+  public void checkColumnAddition() throws Exception {
+    long numTotalDocs = getCountStarResult();
+    Schema schema = createSchema();
+    schema.addField(new MetricFieldSpec("NewAddedIntMetric", 
FieldSpec.DataType.INT, 1));
+    schema.addField(new MetricFieldSpec("NewAddedLongMetric", 
FieldSpec.DataType.LONG, 1));
+    schema.addField(new MetricFieldSpec("NewAddedFloatMetric", 
FieldSpec.DataType.FLOAT));
+    schema.addField(new MetricFieldSpec("NewAddedDoubleMetric", 
FieldSpec.DataType.DOUBLE));
+    schema.addField(new MetricFieldSpec("NewAddedBigDecimalMetric", 
FieldSpec.DataType.BIG_DECIMAL));
+    schema.addField(new MetricFieldSpec("NewAddedBytesMetric", 
FieldSpec.DataType.BYTES));
+    schema.addField(new DimensionFieldSpec("NewAddedMVIntDimension", 
FieldSpec.DataType.INT, false));
+    schema.addField(new DimensionFieldSpec("NewAddedMVLongDimension", 
FieldSpec.DataType.LONG, false));
+    schema.addField(new DimensionFieldSpec("NewAddedMVFloatDimension", 
FieldSpec.DataType.FLOAT, false));
+    schema.addField(new DimensionFieldSpec("NewAddedMVDoubleDimension", 
FieldSpec.DataType.DOUBLE, false));
+    schema.addField(new DimensionFieldSpec("NewAddedMVBooleanDimension", 
FieldSpec.DataType.BOOLEAN, false));
+    schema.addField(new DimensionFieldSpec("NewAddedMVTimestampDimension", 
FieldSpec.DataType.TIMESTAMP, false));
+    schema.addField(new DimensionFieldSpec("NewAddedMVStringDimension", 
FieldSpec.DataType.STRING, false));
+    schema.addField(new DimensionFieldSpec("NewAddedSVJSONDimension", 
FieldSpec.DataType.JSON, true));
+    schema.addField(new DimensionFieldSpec("NewAddedSVBytesDimension", 
FieldSpec.DataType.BYTES, true));
+    schema.addField(
+        new DateTimeFieldSpec("NewAddedDerivedHoursSinceEpoch", 
FieldSpec.DataType.INT, "EPOCH|HOURS", "1:DAYS"));
+    schema.addField(
+        new DateTimeFieldSpec("NewAddedDerivedTimestamp", 
FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", "1:DAYS"));
+    schema.addField(new 
DimensionFieldSpec("NewAddedDerivedSVBooleanDimension", 
FieldSpec.DataType.BOOLEAN, true));
+    schema.addField(new DimensionFieldSpec("NewAddedDerivedMVStringDimension", 
FieldSpec.DataType.STRING, false));
+    schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDs", 
FieldSpec.DataType.INT, false));
+    schema.addField(new 
DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDsString", 
FieldSpec.DataType.STRING, false));
+    schema.addField(new 
DimensionFieldSpec("NewAddedRawDerivedStringDimension", 
FieldSpec.DataType.STRING, true));
+    schema.addField(new DimensionFieldSpec("NewAddedRawDerivedMVIntDimension", 
FieldSpec.DataType.INT, false));
+    schema.addField(new DimensionFieldSpec("NewAddedDerivedMVDoubleDimension", 
FieldSpec.DataType.DOUBLE, false));
+    schema.addField(new DimensionFieldSpec("NewAddedDerivedNullString", 
FieldSpec.DataType.STRING, true, "nil"));
+    schema.setEnableColumnBasedNullHandling(true);
+    addSchema(schema);
+
+    TableConfig tableConfig = getOfflineTableConfig();
+    List<TransformConfig> transformConfigs =
+        Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", 
"DaysSinceEpoch * 24"),
+            new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 
24 * 3600 * 1000"),
+            new TransformConfig("NewAddedDerivedSVBooleanDimension", 
"ActualElapsedTime > 0"),
+            new TransformConfig("NewAddedDerivedMVStringDimension", 
"split(DestCityName, ', ')"),
+            new TransformConfig("NewAddedDerivedDivAirportSeqIDs", 
"DivAirportSeqIDs"),
+            new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", 
"DivAirportSeqIDs"),
+            new TransformConfig("NewAddedRawDerivedStringDimension", 
"reverse(DestCityName)"),
+            new TransformConfig("NewAddedRawDerivedMVIntDimension", 
"ActualElapsedTime"),
+            new TransformConfig("NewAddedDerivedMVDoubleDimension", 
"ArrDelayMinutes"),
+            new TransformConfig("NewAddedDerivedNullString", "caseWhen(true, 
null, null)"));
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
+    tableConfig.setIngestionConfig(ingestionConfig);
+
+    // Ensure that we can reload segments with a new raw derived column
+    
tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedStringDimension");
+    
tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedMVIntDimension");
+    List<FieldConfig> fieldConfigList = new ArrayList<>();
+    fieldConfigList.add(
+        new FieldConfig("NewAddedDerivedDivAirportSeqIDs", 
FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+            FieldConfig.CompressionCodec.MV_ENTRY_DICT, null));
+    fieldConfigList.add(new 
FieldConfig("NewAddedDerivedDivAirportSeqIDsString", 
FieldConfig.EncodingType.DICTIONARY,
+        Collections.emptyList(), FieldConfig.CompressionCodec.MV_ENTRY_DICT, 
null));
+    updateTableConfig(tableConfig);
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+        
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
+    // Will not schedule task if there's incomplete task
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    waitForTaskToComplete();
+
+    // Check that metadata contains processed times.
+    String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX;
+    for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      // Get the value in segment metadata
+      Map<String, String> customMap = metadata.getCustomMap();
+      assertTrue(customMap.containsKey(refreshKey));
+    }
+
+    waitForServerSegmentDownload(aVoid -> {
+      try {
+        String query = "SELECT COUNT(*) FROM mytable WHERE NewAddedIntMetric = 
1";
+        JsonNode response = postQuery(query);
+        return response.get("resultTable").get("rows").get(0).get(0).asLong() 
== numTotalDocs;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    // Verify the index sizes
+    JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest(
+            
_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
+                List.of("DivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDs", 
"NewAddedDerivedDivAirportSeqIDsString",
+                    "NewAddedRawDerivedStringDimension", 
"NewAddedRawDerivedMVIntDimension",
+                    "NewAddedDerivedNullString"))))
+        .get("columnIndexSizeMap");
+    assertEquals(columnIndexSizeMap.size(), 6);
+    JsonNode originalColumnIndexSizes = 
columnIndexSizeMap.get("DivAirportSeqIDs");
+    JsonNode derivedColumnIndexSizes = 
columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDs");
+    JsonNode derivedStringColumnIndexSizes = 
columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDsString");
+    JsonNode derivedRawStringColumnIndex = 
columnIndexSizeMap.get("NewAddedRawDerivedStringDimension");
+    JsonNode derivedRawMVIntColumnIndex = 
columnIndexSizeMap.get("NewAddedRawDerivedMVIntDimension");
+    JsonNode derivedNullStringColumnIndex = 
columnIndexSizeMap.get("NewAddedDerivedNullString");
+
+    // Derived int column should have the same dictionary size as the original 
column
+    double originalColumnDictionarySize = 
originalColumnIndexSizes.get(StandardIndexes.DICTIONARY_ID).asDouble();
+    
assertEquals(derivedColumnIndexSizes.get(StandardIndexes.DICTIONARY_ID).asDouble(),
 originalColumnDictionarySize);
+
+    // Derived string column should have larger dictionary size than the 
original column
+    assertTrue(
+        
derivedStringColumnIndexSizes.get(StandardIndexes.DICTIONARY_ID).asDouble() > 
originalColumnDictionarySize);
+
+    // Both derived columns should have smaller forward index size than the 
original column because of compression
+    double derivedColumnForwardIndexSize = 
derivedColumnIndexSizes.get(StandardIndexes.FORWARD_ID).asDouble();
+    
assertEquals(derivedStringColumnIndexSizes.get(StandardIndexes.FORWARD_ID).asDouble(),
+        derivedColumnForwardIndexSize);
+
+    assertTrue(derivedRawStringColumnIndex.has(StandardIndexes.FORWARD_ID));
+    
assertFalse(derivedRawStringColumnIndex.has(StandardIndexes.DICTIONARY_ID));
+
+    assertTrue(derivedRawMVIntColumnIndex.has(StandardIndexes.FORWARD_ID));
+    assertFalse(derivedRawMVIntColumnIndex.has(StandardIndexes.DICTIONARY_ID));
+
+    
assertTrue(derivedNullStringColumnIndex.has(StandardIndexes.NULL_VALUE_VECTOR_ID));
+  }
+
+  @Test(priority = 5)
+  public void checkRefreshNotNecessary() throws Exception {
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+    Map<String, Long> segmentCrc = new HashMap<>();
+    for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      segmentCrc.put(metadata.getSegmentName(), metadata.getCrc());
+    }
+
+    TableConfig tableConfig = getOfflineTableConfig();
+    tableConfig.setQuotaConfig(new QuotaConfig(null, "10"));
+
+    updateTableConfig(tableConfig);
+
+    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+        
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
+    // Will not schedule task if there's incomplete task
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    waitForTaskToComplete();
+
+    // Check that metadata contains expected values
+    Map<String, String> segmentRefreshTime = new HashMap<>();
+
+    String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX;
+    for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      // Get the value in segment metadata
+      Map<String, String> customMap = metadata.getCustomMap();
+      assertTrue(customMap.containsKey(refreshKey));
+      segmentRefreshTime.put(metadata.getSegmentName(), 
customMap.get(refreshKey));
+      assertEquals(segmentCrc.get(metadata.getSegmentName()), 
metadata.getCrc(), "CRC does not match");
+    }
+
+    // This should be no-op as nothing changes.
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+        .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
+    for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      // Get the value in segment metadata
+      Map<String, String> customMap = metadata.getCustomMap();
+      assertTrue(
+          customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX));
+      assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), 
customMap.get(refreshKey),
+          "Refresh Time doesn't match");
+    }
+  }
+
+  protected void waitForTaskToComplete() {
+    TestUtils.waitForCondition(input -> {
+      // Check task state
+      for (TaskState taskState : 
_helixTaskResourceManager.getTaskStates(MinionConstants.RefreshSegmentTask.TASK_TYPE)
+          .values()) {
+        if (taskState != TaskState.COMPLETED) {
+          return false;
+        }
+      }
+      return true;
+    }, 600_000L, "Failed to complete task");
+  }
+
+  protected void waitForServerSegmentDownload(Function<Void, Boolean> 
conditionFunc) {
+    TestUtils.waitForCondition(aVoid -> {
+      boolean val = conditionFunc.apply(aVoid);
+      return val;
+    }, 60_000L, "Failed to meet condition");
+  }
+
+  private TableTaskConfig getRefreshSegmentTaskConfig() {
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    return new TableTaskConfig(
+        Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE, 
tableTaskConfigs));
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 55dfb97f98..5e41720cde 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -19,10 +19,14 @@
 package org.apache.pinot.plugin.minion.tasks;
 
 import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
@@ -54,6 +58,9 @@ public class MinionTaskUtils {
 
   private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";
 
+  public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+  public static final String UTC = "UTC";
+
   private MinionTaskUtils() {
   }
 
@@ -235,4 +242,15 @@ public class MinionTaskUtils {
     }
     return validDocIds;
   }
+
+  public static String toUTCString(long epochMillis) {
+    Date date = new Date(epochMillis);
+    SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN);
+    isoFormat.setTimeZone(TimeZone.getTimeZone(UTC));
+    return isoFormat.format(date);
+  }
+
+  public static long fromUTCString(String utcString) {
+    return Instant.parse(utcString).toEpochMilli();
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
new file mode 100644
index 0000000000..2509ba3721
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.refreshsegment;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import 
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RefreshSegmentTaskExecutor extends 
BaseSingleSegmentConversionExecutor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RefreshSegmentTaskGenerator.class);
+
+  private long _taskStartTime;
+
+  /**
+   * The code here currently covers segment refresh for the following cases:
+   * 1. Process newly added columns.
+   * 2. Addition/removal of indexes.
+   * 3. Compatible datatype change for existing columns
+   */
+  @Override
+  protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, 
File indexDir, File workingDir)
+      throws Exception {
+    _eventObserver.notifyProgress(pinotTaskConfig, "Refreshing segment: " + 
indexDir);
+
+    // We set _taskStartTime before fetching the tableConfig. Task Generation 
relies on tableConfig/Schema updates
+    // happening after the last processed time. So we explicity use the 
timestamp before fetching tableConfig as the
+    // processedTime.
+    _taskStartTime = System.currentTimeMillis();
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+    String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+    String taskType = pinotTaskConfig.getTaskType();
+
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    PinotConfiguration segmentDirectoryConfigs = 
indexLoadingConfig.getSegmentDirectoryConfigs();
+    SegmentDirectoryLoaderContext segmentLoaderContext =
+        new 
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
+            .setSchema(schema)
+            .setInstanceId(indexLoadingConfig.getInstanceId())
+            .setSegmentName(segmentMetadata.getName())
+            .setSegmentCrc(segmentMetadata.getCrc())
+            .setSegmentDirectoryConfigs(segmentDirectoryConfigs)
+            .build();
+    SegmentDirectory segmentDirectory =
+        
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(indexDir.toURI(),
 segmentLoaderContext);
+
+    // TODO: Instead of relying on needPreprocess(), process segment metadata 
file to determine if refresh is needed.
+    // BaseDefaultColumnHandler part of needPreprocess() does not process any 
changes to existing columns like datatype,
+    // change from dimension to metric, etc.
+    boolean needPreprocess = 
ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, 
schema);
+    closeSegmentDirectoryQuietly(segmentDirectory);
+    Set<String> refreshColumnSet = new HashSet<>();
+
+    for (FieldSpec fieldSpecInSchema : schema.getAllFieldSpecs()) {
+      // Virtual columns are constructed while loading the segment, thus do 
not exist in the record, nor should be
+      // persisted to the disk.
+      if (fieldSpecInSchema.isVirtualColumn()) {
+        continue;
+      }
+
+      String column = fieldSpecInSchema.getName();
+      ColumnMetadata columnMetadata = 
segmentMetadata.getColumnMetadataFor(column);
+      if (columnMetadata != null) {
+        FieldSpec fieldSpecInSegment = columnMetadata.getFieldSpec();
+
+        // Check the data type and default value matches.
+        FieldSpec.DataType dataTypeInSegment = 
fieldSpecInSegment.getDataType();
+        FieldSpec.DataType dataTypeInSchema = fieldSpecInSchema.getDataType();
+
+        // Column exists in segment.
+        if (dataTypeInSegment != dataTypeInSchema) {
+          // Check if we need to update the data-type. DataType change is 
dependent on segmentGeneration code converting
+          // the object to the destination datatype. If the existing data is 
the column is not compatible with the
+          // destination data-type, the refresh task will fail.
+          refreshColumnSet.add(column);
+        }
+
+        // TODO: Maybe we can support singleValue to multi-value conversions 
are supproted and vice-versa.
+      } else {
+        refreshColumnSet.add(column);
+      }
+    }
+
+    if (!needPreprocess && refreshColumnSet.isEmpty()) {
+      LOGGER.info("Skipping segment={}, table={} as it is up-to-date with new 
table/schema", segmentName,
+          tableNameWithType);
+      // We just need to update the ZK metadata with the last refresh time to 
avoid getting picked up again. As the CRC
+      // check will match, this will only end up being a ZK update.
+      return new 
SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType)
+          .setFile(indexDir)
+          .setSegmentName(segmentName)
+          .build();
+    }
+
+    // Refresh the segment. Segment reload is achieved by generating a new 
segment from scratch using the updated schema
+    // and table configs.
+    try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {
+      recordReader.init(indexDir, null, null);
+      SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, 
tableConfig, segmentMetadata, segmentName,
+          getSchema(tableNameWithType));
+      SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+      driver.init(config, recordReader);
+      driver.build();
+    }
+
+    File refreshedSegmentFile = new File(workingDir, segmentName);
+    SegmentConversionResult result = new 
SegmentConversionResult.Builder().setFile(refreshedSegmentFile)
+        .setTableNameWithType(tableNameWithType)
+        .setSegmentName(segmentName)
+        .build();
+
+    long endMillis = System.currentTimeMillis();
+    LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", 
taskType, configs,
+        (endMillis - _taskStartTime));
+
+    return result;
+  }
+
+  private static SegmentGeneratorConfig getSegmentGeneratorConfig(File 
workingDir, TableConfig tableConfig,
+      SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) {
+    // Inverted index creation is disabled by default during segment 
generation typically to reduce segment push times
+    // from external sources like HDFS. Also, not creating an inverted index 
here, the segment will always be flagged as
+    // needReload, causing the segment refresh to take place.
+    
tableConfig.getIndexingConfig().setCreateInvertedIndexDuringSegmentGeneration(true);
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(workingDir.getPath());
+    config.setSegmentName(segmentName);
+
+    // Keep index creation time the same as original segment because both 
segments use the same raw data.
+    // This way, for REFRESH case, when new segment gets pushed to controller, 
we can use index creation time to
+    // identify if the new pushed segment has newer data than the existing one.
+    
config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));
+
+    // The time column type info is not stored in the segment metadata.
+    // Keep segment start/end time to properly handle time column type other 
than EPOCH (e.g.SIMPLE_FORMAT).
+    if (segmentMetadata.getTimeInterval() != null) {
+      
config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
+      config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
+      config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
+      config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
+    }
+    return config;
+  }
+
+  private static void closeSegmentDirectoryQuietly(SegmentDirectory 
segmentDirectory) {
+    if (segmentDirectory != null) {
+      try {
+        segmentDirectory.close();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to close SegmentDirectory due to error: {}", 
e.getMessage());
+      }
+    }
+  }
+
+  @Override
+  protected SegmentZKMetadataCustomMapModifier 
getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
+      SegmentConversionResult segmentConversionResult) {
+    return new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
+        Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE 
+ MinionConstants.TASK_TIME_SUFFIX,
+            MinionTaskUtils.toUTCString(_taskStartTime)));
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutorFactory.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutorFactory.java
new file mode 100644
index 0000000000..5214d46645
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutorFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.refreshsegment;
+
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.minion.executor.PinotTaskExecutor;
+import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
+public class RefreshSegmentTaskExecutorFactory implements 
PinotTaskExecutorFactory {
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf 
minionConf) {
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.RefreshSegmentTask.TASK_TYPE;
+  }
+
+  @Override
+  public PinotTaskExecutor create() {
+    return new RefreshSegmentTaskExecutor();
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
new file mode 100644
index 0000000000..59e85c1b1e
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.refreshsegment;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.data.Segment;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RefreshSegmentTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class RefreshSegmentTaskGenerator extends BaseTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RefreshSegmentTaskGenerator.class);
+
+  @Override
+  public String getTaskType() {
+    return RefreshSegmentTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RefreshSegmentTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+    PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
+
+    int tableNumTasks = 0;
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableNameWithType = tableConfig.getTableName();
+      LOGGER.info("Start generating RefreshSegment tasks for table: {}", 
tableNameWithType);
+
+      // Get the task configs for the table. This is used to restrict the 
maximum number of allowed tasks per table at
+      // any given point.
+      Map<String, String> taskConfigs;
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      if (tableTaskConfig == null) {
+        LOGGER.warn("Failed to find task config for table: {}", 
tableNameWithType);
+        continue;
+      }
+      taskConfigs = 
tableTaskConfig.getConfigsForTaskType(RefreshSegmentTask.TASK_TYPE);
+      Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null 
for Table: %s", tableNameWithType);
+      int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
+      String tableMaxNumTasksConfig = 
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+      if (tableMaxNumTasksConfig != null) {
+        try {
+          tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+        } catch (Exception e) {
+          tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
+          LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and 
task {}", tableNameWithType, taskType);
+        }
+      }
+
+      // Get info about table and schema.
+      Stat tableStat = 
pinotHelixResourceManager.getTableStat(tableNameWithType);
+      Schema schema = 
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+      Stat schemaStat = 
pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());
+
+      // Get the running segments for a table.
+      Set<Segment> runningSegments =
+          TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, 
_clusterInfoAccessor);
+
+      // Make a single ZK call to get the segments.
+      List<SegmentZKMetadata> allSegments = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+
+      for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+        // Skip if we have reached the maximum number of permissible tasks per 
iteration.
+        if (tableNumTasks >= tableMaxNumTasks) {
+          break;
+        }
+
+        // Skip consuming segments.
+        if (tableConfig.getTableType() == TableType.REALTIME && 
!segmentZKMetadata.getStatus().isCompleted()) {
+          continue;
+        }
+
+        // Skip segments for which a task is already running.
+        if (runningSegments.contains(new Segment(tableNameWithType, 
segmentZKMetadata.getSegmentName()))) {
+          continue;
+        }
+
+        String segmentName = segmentZKMetadata.getSegmentName();
+
+        // Skip if the segment is already up-to-date and doesn't have to be 
refreshed.
+        if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, 
schemaStat)) {
+          continue;
+        }
+
+        Map<String, String> configs = new 
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
+        configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
segmentZKMetadata.getDownloadUrl());
+        configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
+        configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, 
String.valueOf(segmentZKMetadata.getCrc()));
+        pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
+        tableNumTasks++;
+      }
+
+      LOGGER.info("Finished generating {} tasks configs for table: {} " + "for 
task: {}", tableNumTasks,
+          tableNameWithType, taskType);
+    }
+
+    return pinotTaskConfigs;
+  }
+
+  /**
+   * We need not refresh when: There were no tableConfig or schema updates 
after the last time the segment was
+   * refreshed by this task.
+   *
+   * Note that newly created segments after the latest tableConfig/schema 
update will still need to be refreshed. This
+   * is because inverted index created is disabled by default during segment 
generation. This can be added as an
+   * additional check in the future, if required.
+   */
+  private boolean shouldRefreshSegment(SegmentZKMetadata segmentZKMetadata, 
TableConfig tableConfig, Stat tableStat,
+      Stat schemaStat) {
+    String tableNameWithType = tableConfig.getTableName();
+    String timestampKey = RefreshSegmentTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX;
+
+    long lastProcessedTime = 0L;
+    if (segmentZKMetadata.getCustomMap() != null && 
segmentZKMetadata.getCustomMap().containsKey(timestampKey)) {
+      lastProcessedTime = 
MinionTaskUtils.fromUTCString(segmentZKMetadata.getCustomMap().get(timestampKey));
+    }
+
+    if (tableStat == null || schemaStat == null) {
+      LOGGER.warn("Table or schema stat is null for table: {}", 
tableNameWithType);
+      return false;
+    }
+
+    long tableMTime = tableStat.getMtime();
+    long schemaMTime = schemaStat.getMtime();
+
+//    TODO: See comment above - add this later if required.
+//    boolean segmentCreatedBeforeUpdate =
+//        tableMTime > segmentZKMetadata.getCreationTime() || schemaMTime > 
segmentZKMetadata.getCreationTime();
+
+    boolean segmentProcessedBeforeUpdate = tableMTime > lastProcessedTime || 
schemaMTime > lastProcessedTime;
+    return segmentProcessedBeforeUpdate;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskProgressObserverFactory.java
similarity index 59%
copy from 
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java
copy to 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskProgressObserverFactory.java
index ea944cc39c..b10db94901 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskProgressObserverFactory.java
@@ -16,30 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pinot.plugin.minion.tasks.refreshsegment;
 
-package org.apache.pinot.spi.config.table;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory;
+import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
 
+@EventObserverFactory
+public class RefreshSegmentTaskProgressObserverFactory extends 
BaseMinionProgressObserverFactory {
 
-/*
- * Container object for metadata info / stats of Pinot tables
- */
-public class TableStats {
-  public static final String CREATION_TIME_KEY = "creationTime";
-
-  private String _creationTime;
-
-  public TableStats(String creationTime) {
-    _creationTime = creationTime;
-  }
-
-  @JsonProperty(CREATION_TIME_KEY)
-  public String getCreationTime() {
-    return _creationTime;
-  }
-
-  public void setCreationTime(String creationTime) {
-    _creationTime = creationTime;
+  @Override
+  public String getTaskType() {
+    return MinionConstants.RefreshSegmentTask.TASK_TYPE;
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatsHumanReadable.java
similarity index 87%
rename from 
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatsHumanReadable.java
index ea944cc39c..1c133f2868 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatsHumanReadable.java
@@ -23,14 +23,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 
 /*
- * Container object for metadata info / stats of Pinot tables
+ * Container object for human-readable metadata info / stats of Pinot tables
  */
-public class TableStats {
+public class TableStatsHumanReadable {
   public static final String CREATION_TIME_KEY = "creationTime";
 
   private String _creationTime;
 
-  public TableStats(String creationTime) {
+  public TableStatsHumanReadable(String creationTime) {
     _creationTime = creationTime;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to