This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c7e835f961 Realtime table consumption resume API (#8663)
c7e835f961 is described below

commit c7e835f9613d7bcc94c5e68c180dfb225b5acc1b
Author: Saurabh Dubey <saurabhd...@gmail.com>
AuthorDate: Tue May 24 22:32:36 2022 +0530

    Realtime table consumption resume API (#8663)
---
 .../common/messages/RunPeriodicTaskMessage.java    |  13 ++-
 ...ControllerUserDefinedMessageHandlerFactory.java |  13 ++-
 .../pinot/controller/api/resources/Constants.java  |   2 +
 ...PinotControllerPeriodicTaskRestletResource.java |  38 +-------
 .../api/resources/PinotRealtimeTableResource.java  |  65 +++++++++++++
 .../controller/helix/SegmentStatusChecker.java     |   3 +-
 .../helix/core/PinotHelixResourceManager.java      |  38 ++++++++
 .../helix/core/minion/PinotTaskManager.java        |   3 +-
 .../core/periodictask/ControllerPeriodicTask.java  |  13 ++-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 101 +++++++++++++--------
 .../BrokerResourceValidationManager.java           |   3 +-
 .../RealtimeSegmentValidationManager.java          |  12 ++-
 .../periodictask/ControllerPeriodicTaskTest.java   |   5 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   4 +-
 14 files changed, 225 insertions(+), 88 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
index 11ee6cf56c..4e3178fe41 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.messages;
 
+import java.util.Map;
 import java.util.UUID;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.Message;
@@ -32,13 +33,15 @@ public class RunPeriodicTaskMessage extends Message {
   private static final String PERIODIC_TASK_REQUEST_ID = "requestId";
   private static final String PERIODIC_TASK_NAME_KEY = "taskName";
   private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType";
+  private static final String TASK_PROPERTIES = "taskProperties";
 
   /**
    * @param taskRequestId Request Id that will be appended to log messages.
    * @param periodicTaskName Name of the task that will be run.
    * @param tableNameWithType Table (names with type suffix) on which task 
will run.
    */
-  public RunPeriodicTaskMessage(String taskRequestId, String periodicTaskName, 
String tableNameWithType) {
+  public RunPeriodicTaskMessage(String taskRequestId, String periodicTaskName, 
String tableNameWithType,
+      Map<String, String> taskProperties) {
     super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
     setMsgSubType(RUN_PERIODIC_TASK_MSG_SUB_TYPE);
     setExecutionTimeout(-1);
@@ -46,6 +49,10 @@ public class RunPeriodicTaskMessage extends Message {
     znRecord.setSimpleField(PERIODIC_TASK_REQUEST_ID, taskRequestId);
     znRecord.setSimpleField(PERIODIC_TASK_NAME_KEY, periodicTaskName);
     znRecord.setSimpleField(TABLE_NAME_WITH_TYPE_KEY, tableNameWithType);
+
+    if (taskProperties != null) {
+      znRecord.setMapField(TASK_PROPERTIES, taskProperties);
+    }
   }
 
   public RunPeriodicTaskMessage(Message message) {
@@ -63,4 +70,8 @@ public class RunPeriodicTaskMessage extends Message {
   public String getTableNameWithType() {
     return getRecord().getSimpleField(TABLE_NAME_WITH_TYPE_KEY);
   }
+
+  public Map<String, String> getTaskProperties() {
+    return getRecord().getMapField(TASK_PROPERTIES);
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
index fd9dfb843b..8adbaff9c6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller;
 
+import java.util.Map;
 import java.util.Properties;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -71,6 +72,7 @@ public class ControllerUserDefinedMessageHandlerFactory 
implements MessageHandle
     private final String _periodicTaskRequestId;
     private final String _periodicTaskName;
     private final String _tableNameWithType;
+    private final Map<String, String> _taskProperties;
     private final PeriodicTaskScheduler _periodicTaskScheduler;
 
     RunPeriodicTaskMessageHandler(RunPeriodicTaskMessage message, 
NotificationContext context,
@@ -79,6 +81,7 @@ public class ControllerUserDefinedMessageHandlerFactory 
implements MessageHandle
       _periodicTaskRequestId = message.getPeriodicTaskRequestId();
       _periodicTaskName = message.getPeriodicTaskName();
       _tableNameWithType = message.getTableNameWithType();
+      _taskProperties = message.getTaskProperties();
       _periodicTaskScheduler = periodicTaskScheduler;
     }
 
@@ -88,7 +91,8 @@ public class ControllerUserDefinedMessageHandlerFactory 
implements MessageHandle
       LOGGER.info("[TaskRequestId: {}] Handling RunPeriodicTaskMessage by 
executing task {}", _periodicTaskRequestId,
           _periodicTaskName);
       _periodicTaskScheduler
-          .scheduleNow(_periodicTaskName, 
createTaskProperties(_periodicTaskRequestId, _tableNameWithType));
+          .scheduleNow(_periodicTaskName, 
createTaskProperties(_periodicTaskRequestId, _tableNameWithType,
+              _taskProperties));
       HelixTaskResult helixTaskResult = new HelixTaskResult();
       helixTaskResult.setSuccess(true);
       return helixTaskResult;
@@ -99,7 +103,8 @@ public class ControllerUserDefinedMessageHandlerFactory 
implements MessageHandle
       LOGGER.error("[TaskRequestId: {}] Message handling error.", 
_periodicTaskRequestId, e);
     }
 
-    private static Properties createTaskProperties(String 
periodicTaskRequestId, String tableNameWithType) {
+    private static Properties createTaskProperties(String 
periodicTaskRequestId, String tableNameWithType,
+        Map<String, String> taskProperties) {
       Properties periodicTaskParameters = new Properties();
       if (periodicTaskRequestId != null) {
         
periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID, 
periodicTaskRequestId);
@@ -109,6 +114,10 @@ public class ControllerUserDefinedMessageHandlerFactory 
implements MessageHandle
         
periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_TABLE_NAME, 
tableNameWithType);
       }
 
+      if (taskProperties != null) {
+        taskProperties.forEach(periodicTaskParameters::setProperty);
+      }
+
       return periodicTaskParameters;
     }
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 109bdfc565..513b12f9d3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -51,6 +51,8 @@ public class Constants {
   public static final String PERIODIC_TASK_TAG = "PeriodicTask";
   public static final String UPSERT_RESOURCE_TAG = "Upsert";
 
+  public static final String REALTIME_SEGMENT_VALIDATION_MANAGER = 
"RealtimeSegmentValidationManager";
+
   public static TableType validateTableType(String tableTypeStr) {
     if (StringUtils.isBlank(tableTypeStr)) {
       return null;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
index 99a597bbc4..8566d4509c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
@@ -22,7 +22,6 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import java.util.List;
-import java.util.UUID;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -31,13 +30,9 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
-import org.apache.helix.InstanceType;
-import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +42,6 @@ import org.slf4j.LoggerFactory;
 public class PinotControllerPeriodicTaskRestletResource {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
-  private static final String API_REQUEST_ID_PREFIX = "api-";
 
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
@@ -84,33 +78,11 @@ public class PinotControllerPeriodicTaskRestletResource {
       tableName = matchingTableNamesWithType.get(0);
     }
 
-    // Generate an id for this request by taking first eight characters of a 
randomly generated UUID. This request id
-    // is returned to the user and also appended to log messages so that user 
can locate all log messages associated
-    // with this PeriodicTask's execution.
-    String periodicTaskRequestId = API_REQUEST_ID_PREFIX + 
UUID.randomUUID().toString().substring(0, 8);
+    Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
+        .invokeControllerPeriodicTask(tableName, periodicTaskName, null);
 
-    LOGGER.info(
-        "[TaskRequestId: {}] Sending periodic task message to all controllers 
for running task {} against {}.",
-        periodicTaskRequestId, periodicTaskName, tableName != null ? " table 
'" + tableName + "'" : "all tables");
-
-    // Create and send message to send to all controllers (including this one)
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setSessionSpecific(true);
-    
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
-    recipientCriteria.setSelfExcluded(false);
-    RunPeriodicTaskMessage runPeriodicTaskMessage =
-        new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, 
tableName);
-
-    ClusterMessagingService clusterMessagingService =
-        _pinotHelixResourceManager.getHelixZkManager().getMessagingService();
-    int messageCount = clusterMessagingService.send(recipientCriteria, 
runPeriodicTaskMessage, null, -1);
-    LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to 
{} controllers.", periodicTaskRequestId,
-        messageCount);
-
-    return "{\"Log Request Id\": \"" + periodicTaskRequestId + 
"\",\"Controllers notified\":" + (messageCount > 0)
-        + "}";
+    return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
+        + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 
0) + "}";
   }
 
   @GET
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
new file mode 100644
index 0000000000..c9161592a4
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.HashMap;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+@Api(tags = Constants.TABLE_TAG)
+@Path("/")
+public class PinotRealtimeTableResource {
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @POST
+  @Path("/tables/{tableName}/resumeConsumption")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resume the consumption of a realtime table",
+      notes = "Resume the consumption of a realtime table")
+  public String resumeConsumption(
+      @ApiParam(value = "Name of the table", required = true)
+      @PathParam("tableName") String tableName) throws JsonProcessingException 
{
+    // TODO: Add util method for invoking periodic tasks
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    Map<String, String> taskProperties = new HashMap<>();
+    
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY,
 "true");
+
+    Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
+        .invokeControllerPeriodicTask(tableNameWithType, 
Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
+
+    return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
+        + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 
0) + "}";
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index a05dac5f9b..bf9f179925 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -95,7 +96,7 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
   }
 
   @Override
-  protected Context preprocess() {
+  protected Context preprocess(Properties periodicTaskProperties) {
     Context context = new Context();
     // check if we need to log disabled tables log messages
     long now = System.currentTimeMillis();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index cccf6d348e..dfd0510ae2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -42,6 +42,7 @@ import java.util.Set;
 import java.util.TimeZone;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -55,6 +56,7 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.Criteria;
@@ -90,6 +92,7 @@ import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.lineage.SegmentLineageUtils;
 import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.messages.TableConfigRefreshMessage;
@@ -165,6 +168,7 @@ public class PinotHelixResourceManager {
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
   public static final String APPEND = "APPEND";
   private static final int DEFAULT_TABLE_UPDATER_LOCKERS_SIZE = 100;
+  private static final String API_REQUEST_ID_PREFIX = "api-";
 
   // TODO: make this configurable
   public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 
60_000L; // 10 minutes
@@ -3560,6 +3564,40 @@ public class PinotHelixResourceManager {
     return tableConfig.getValidationConfig().getReplicationNumber();
   }
 
+  /**
+   * Trigger controller periodic task using helix messaging service
+   * @param tableName Name of table against which task is to be run
+   * @param periodicTaskName Task name
+   * @param taskProperties Extra properties to be passed along
+   * @return Task id for filtering logs, along with the number of successfully 
sent messages
+   */
+  public Pair<String, Integer> invokeControllerPeriodicTask(String tableName, 
String periodicTaskName,
+      Map<String, String> taskProperties) {
+    String periodicTaskRequestId = API_REQUEST_ID_PREFIX + 
UUID.randomUUID().toString().substring(0, 8);
+
+    LOGGER.info(
+        "[TaskRequestId: {}] Sending periodic task message to all controllers 
for running task {} against {},"
+            + " with properties {}.\"", periodicTaskRequestId, 
periodicTaskName,
+        tableName != null ? " table '" + tableName + "'" : "all tables", 
taskProperties);
+
+    // Create and send message to send to all controllers (including this one)
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setSessionSpecific(true);
+    
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+    recipientCriteria.setSelfExcluded(false);
+    RunPeriodicTaskMessage runPeriodicTaskMessage =
+        new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, 
tableName, taskProperties);
+
+    ClusterMessagingService clusterMessagingService = 
getHelixZkManager().getMessagingService();
+    int messageCount = clusterMessagingService.send(recipientCriteria, 
runPeriodicTaskMessage, null, -1);
+
+    LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to 
{} controllers.", periodicTaskRequestId,
+        messageCount);
+    return Pair.of(periodicTaskRequestId, messageCount);
+  }
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 522cc5e077..d7c73b7ac1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -587,7 +588,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   }
 
   @Override
-  protected void processTables(List<String> tableNamesWithType) {
+  protected void processTables(List<String> tableNamesWithType, Properties 
taskProperties) {
     scheduleTasks(tableNamesWithType, true);
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 439f8be45c..0e737b2e16 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -62,7 +62,6 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
     try {
       // Check if we have a specific table against which this task needs to be 
run.
       String propTableNameWithType = (String) 
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
-
       // Process the tables that are managed by this controller
       List<String> tablesToProcess = new ArrayList<>();
       List<String> nonLeaderForTables = new ArrayList<>();
@@ -83,7 +82,7 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
       }
 
       if (!tablesToProcess.isEmpty()) {
-        processTables(tablesToProcess);
+        processTables(tablesToProcess, periodicTaskProperties);
       }
       if (!nonLeaderForTables.isEmpty()) {
         nonLeaderCleanup(nonLeaderForTables);
@@ -103,10 +102,10 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
    * <p>
    * Override one of this method, {@link #processTable(String)} or {@link 
#processTable(String, C)}.
    */
-  protected void processTables(List<String> tableNamesWithType) {
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
     int numTables = tableNamesWithType.size();
     LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
-    C context = preprocess();
+    C context = preprocess(periodicTaskProperties);
     int numTablesProcessed = 0;
     for (String tableNameWithType : tableNamesWithType) {
       if (!isStarted()) {
@@ -129,14 +128,14 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
   /**
    * Can be overridden to provide context before processing the tables.
    */
-  protected C preprocess() {
+  protected C preprocess(Properties periodicTaskProperties) {
     return null;
   }
 
   /**
    * Processes the given table.
    * <p>
-   * Override one of this method, {@link #processTable(String)} or {@link 
#processTables(List)}.
+   * Override one of this method, {@link #processTable(String)} or {@link 
#processTables(List, Properties)}.
    */
   protected void processTable(String tableNameWithType, C context) {
     processTable(tableNameWithType);
@@ -145,7 +144,7 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
   /**
    * Processes the given table.
    * <p>
-   * Override one of this method, {@link #processTable(String, C)} or {@link 
#processTables(List)}.
+   * Override one of this method, {@link #processTable(String, C)} or {@link 
#processTables(List, Properties)}.
    */
   protected void processTable(String tableNameWithType) {
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 2d07e20be0..1a7428687b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -859,10 +859,18 @@ public class PinotLLCRealtimeSegmentManager {
    * If the controller fails before step-1, the server will see this as an 
upload failure, and will re-try.
    * @param tableConfig
    *
+   * If the consuming segment is deleted by user intentionally or by mistake:
+   * Check whether there are segments in the PROPERTYSTORE with status DONE, 
but no new segment in status
+   * IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is 
ONLINE.
+   * If so, it should create a new CONSUMING segment for the partition.
+   * (this operation is done only if @param recreateDeletedConsumingSegment is 
set to true,
+   * which means it's manually triggered by admin not by automatic periodic 
task)
+   *
    * TODO: We need to find a place to detect and update a gauge for 
nonConsumingPartitionsCount for a table, and
    * reset it to 0 at the end of validateLLC
    */
-  public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig) {
+  public void ensureAllPartitionsConsuming(TableConfig tableConfig,
+      PartitionLevelStreamConfig streamConfig, boolean 
recreateDeletedConsumingSegment) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
@@ -877,7 +885,8 @@ public class PinotLLCRealtimeSegmentManager {
         List<PartitionGroupMetadata> newPartitionGroupMetadataList =
             getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
         streamConfig.setOffsetCriteria(originalOffsetCriteria);
-        return ensureAllPartitionsConsuming(tableConfig, streamConfig, 
idealState, newPartitionGroupMetadataList);
+        return ensureAllPartitionsConsuming(tableConfig, streamConfig, 
idealState, newPartitionGroupMetadataList,
+            recreateDeletedConsumingSegment);
       } else {
         LOGGER.info("Skipping LLC segments validation for disabled table: {}", 
realtimeTableName);
         return idealState;
@@ -1032,7 +1041,8 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig,
-      IdealState idealState, List<PartitionGroupMetadata> 
newPartitionGroupMetadataList) {
+      IdealState idealState, List<PartitionGroupMetadata> 
newPartitionGroupMetadataList,
+      boolean recreateDeletedConsumingSegment) {
     String realtimeTableName = tableConfig.getTableName();
 
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
@@ -1061,10 +1071,13 @@ public class PinotLLCRealtimeSegmentManager {
     //    b. update current segment in idealstate to ONLINE (only if partition 
is present in newPartitionGroupMetadata)
     //    c. add new segment in idealstate to CONSUMING on the hosts (only if 
partition is present in
     //    newPartitionGroupMetadata)
-    // 2. The latest metadata is IN_PROGRESS, but segment is not there in 
idealstate.
+    // 2. The latest metadata is in DONE state, but the idealstate has no 
segment in CONSUMING state.
+    //    a. Create metadata for new IN_PROGRESS segment with startOffset set 
to latest segments' end offset.
+    //    b. Add the newly created segment to idealstate with segment state 
set to CONSUMING.
+    // 3. The latest metadata is IN_PROGRESS, but segment is not there in 
idealstate.
     //    a. change prev segment to ONLINE in idealstate
     //    b. add latest segment to CONSUMING in idealstate.
-    // 3. All instances of a segment are in OFFLINE state.
+    // 4. All instances of a segment are in OFFLINE state.
     //    a. Create a new segment (with the next seq number)
     //       and restart consumption from the same offset (if possible) or a 
newer offset (if realtime stream does
     //       not have the same offset).
@@ -1117,43 +1130,29 @@ public class PinotLLCRealtimeSegmentManager {
           // CONSUMING segment
           // 1. all replicas OFFLINE and metadata IN_PROGRESS/DONE - a segment 
marked itself OFFLINE during
           // consumption for some reason
-          // 2. all replicas ONLINE and metadata DONE - Resolved in 
https://github.com/linkedin/pinot/pull/2890
+          // 2. all replicas ONLINE and metadata DONE
           // 3. we should never end up with some replicas ONLINE and some 
OFFLINE.
           if (isAllInstancesInState(instanceStateMap, 
SegmentStateModel.OFFLINE)) {
             LOGGER.info("Repairing segment: {} which is OFFLINE for all 
instances in IdealState", latestSegmentName);
-
-            // Create a new segment to re-consume from the previous start 
offset
-            LLCSegmentName newLLCSegmentName = 
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
             StreamPartitionMsgOffset startOffset = 
offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
-            StreamPartitionMsgOffset partitionGroupSmallestOffset =
-                getPartitionGroupSmallestOffset(streamConfig, 
partitionGroupId);
-
-            // Start offset must be higher than the start offset of the stream
-            if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
-              LOGGER.error("Data lost from offset: {} to: {} for partition: {} 
of table: {}", startOffset,
-                  partitionGroupSmallestOffset, partitionGroupId, 
realtimeTableName);
-              _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
-              startOffset = partitionGroupSmallestOffset;
-            }
-
-            CommittingSegmentDescriptor committingSegmentDescriptor =
-                new CommittingSegmentDescriptor(latestSegmentName, 
startOffset.toString(), 0);
-            createNewSegmentZKMetadata(tableConfig, streamConfig, 
newLLCSegmentName, currentTimeMs,
-                committingSegmentDescriptor, latestSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas,
-                newPartitionGroupMetadataList);
-            String newSegmentName = newLLCSegmentName.getSegmentName();
-            updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
null, newSegmentName, segmentAssignment,
-                instancePartitionsMap);
+            createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
+                partitionGroupId, newPartitionGroupMetadataList, 
instancePartitions, instanceStatesMap,
+                segmentAssignment, instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
-              // If we get here, that means in IdealState, the latest segment 
has no CONSUMING replicas, but has
-              // replicas
-              // not OFFLINE. That is an unexpected state which cannot be 
fixed by the validation manager currently. In
-              // that case, we need to either extend this part to handle the 
state, or prevent segments from getting
-              // into
-              // such state.
-              LOGGER
-                  .error("Got unexpected instance state map: {} for segment: 
{}", instanceStateMap, latestSegmentName);
+              if (recreateDeletedConsumingSegment && 
Status.DONE.equals(latestSegmentZKMetadata.getStatus())
+                  && isAllInstancesInState(instanceStateMap, 
SegmentStateModel.ONLINE)) {
+                // If we get here, that means in IdealState, the latest 
segment has all replicas ONLINE.
+                // Create a new IN_PROGRESS segment in PROPERTYSTORE,
+                // add it as CONSUMING segment to IDEALSTATE.
+                StreamPartitionMsgOffset startOffset = 
offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
+                createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
+                    partitionGroupId, newPartitionGroupMetadataList, 
instancePartitions,
+                    instanceStatesMap, segmentAssignment, 
instancePartitionsMap, startOffset);
+              } else {
+                LOGGER.error("Got unexpected instance state map: {} for 
segment: {}",
+                    instanceStateMap, latestSegmentName);
+              }
             }
             // else, the partition group has reached end of life. This is an 
acceptable state
           }
@@ -1211,6 +1210,36 @@ public class PinotLLCRealtimeSegmentManager {
     return idealState;
   }
 
+  private void createNewConsumingSegment(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig,
+      SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, int 
partitionGroupId,
+      List<PartitionGroupMetadata> newPartitionGroupMetadataList, 
InstancePartitions instancePartitions,
+      Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment 
segmentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
StreamPartitionMsgOffset startOffset) {
+    int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+    int numPartitions = newPartitionGroupMetadataList.size();
+    LLCSegmentName latestLLCSegmentName = new 
LLCSegmentName(latestSegmentZKMetadata.getSegmentName());
+    LLCSegmentName newLLCSegmentName = 
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
+    StreamPartitionMsgOffset partitionGroupSmallestOffset =
+        getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
+
+    // Start offset must be higher than the start offset of the stream
+    if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
+      LOGGER.error("Data lost from offset: {} to: {} for partition: {} of 
table: {}", startOffset,
+          partitionGroupSmallestOffset, partitionGroupId, 
tableConfig.getTableName());
+      _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), 
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
+      startOffset = partitionGroupSmallestOffset;
+    }
+
+    CommittingSegmentDescriptor committingSegmentDescriptor =
+        new 
CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), 
startOffset.toString(), 0);
+    createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
currentTimeMs,
+        committingSegmentDescriptor, latestSegmentZKMetadata, 
instancePartitions, numPartitions,
+        numReplicas, newPartitionGroupMetadataList);
+    String newSegmentName = newLLCSegmentName.getSegmentName();
+    updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
newSegmentName, segmentAssignment,
+        instancePartitionsMap);
+  }
+
   private StreamPartitionMsgOffset 
getPartitionGroupSmallestOffset(StreamConfig streamConfig, int 
partitionGroupId) {
     OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
     streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 2422b1ef1f..858b3cae46 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.validation;
 
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -45,7 +46,7 @@ public class BrokerResourceValidationManager extends 
ControllerPeriodicTask<Brok
   }
 
   @Override
-  protected Context preprocess() {
+  protected Context preprocess(Properties periodicTaskProperties) {
     Context context = new Context();
     context._instanceConfigs = 
_pinotHelixResourceManager.getAllHelixInstanceConfigs();
     return context;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 237924add0..5831c0617a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.validation;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -55,6 +56,8 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   private final int _segmentLevelValidationIntervalInSeconds;
   private long _lastSegmentLevelValidationRunTimeMs = 0L;
 
+  public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = 
"recreateDeletedConsumingSegment";
+
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
       ValidationMetrics validationMetrics, ControllerMetrics 
controllerMetrics) {
@@ -69,7 +72,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   }
 
   @Override
-  protected Context preprocess() {
+  protected Context preprocess(Properties periodicTaskProperties) {
     Context context = new Context();
     // Run segment level validation only if certain time has passed after 
previous run
     long currentTimeMs = System.currentTimeMillis();
@@ -79,6 +82,9 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       context._runSegmentLevelValidation = true;
       _lastSegmentLevelValidationRunTimeMs = currentTimeMs;
     }
+    context._recreateDeletedConsumingSegment =
+        
Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY));
+
     return context;
   }
 
@@ -100,7 +106,8 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
           IngestionConfigUtils.getStreamConfigMap(tableConfig));
       if (streamConfig.hasLowLevelConsumerType()) {
-        _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig);
+        _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig,
+            streamConfig, context._recreateDeletedConsumingSegment);
       }
     }
   }
@@ -174,6 +181,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
   public static final class Context {
     private boolean _runSegmentLevelValidation;
+    private boolean _recreateDeletedConsumingSegment;
   }
 
   @VisibleForTesting
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 6d66b2bdab..23a5dada97 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.periodictask;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
@@ -70,9 +71,9 @@ public class ControllerPeriodicTaskTest {
     }
 
     @Override
-    public void processTables(List<String> tableNamesWithType) {
+    public void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
       _processTablesCalled.set(true);
-      super.processTables(tableNamesWithType);
+      super.processTables(tableNamesWithType, periodicTaskProperties);
     }
 
     @Override
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index d7dc14027b..4f7acf3773 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -888,7 +888,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       // Expected
     }
     try {
-      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, 
segmentManager._streamConfig);
+      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, 
segmentManager._streamConfig, false);
       fail();
     } catch (IllegalStateException e) {
       // Expected
@@ -1115,7 +1115,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     public void ensureAllPartitionsConsuming() {
       ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
-          getNewPartitionGroupMetadataList(_streamConfig, 
Collections.emptyList()));
+          getNewPartitionGroupMetadataList(_streamConfig, 
Collections.emptyList()), false);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to