This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c7e835f961 Realtime table consumption resume API (#8663) c7e835f961 is described below commit c7e835f9613d7bcc94c5e68c180dfb225b5acc1b Author: Saurabh Dubey <saurabhd...@gmail.com> AuthorDate: Tue May 24 22:32:36 2022 +0530 Realtime table consumption resume API (#8663) --- .../common/messages/RunPeriodicTaskMessage.java | 13 ++- ...ControllerUserDefinedMessageHandlerFactory.java | 13 ++- .../pinot/controller/api/resources/Constants.java | 2 + ...PinotControllerPeriodicTaskRestletResource.java | 38 +------- .../api/resources/PinotRealtimeTableResource.java | 65 +++++++++++++ .../controller/helix/SegmentStatusChecker.java | 3 +- .../helix/core/PinotHelixResourceManager.java | 38 ++++++++ .../helix/core/minion/PinotTaskManager.java | 3 +- .../core/periodictask/ControllerPeriodicTask.java | 13 ++- .../realtime/PinotLLCRealtimeSegmentManager.java | 101 +++++++++++++-------- .../BrokerResourceValidationManager.java | 3 +- .../RealtimeSegmentValidationManager.java | 12 ++- .../periodictask/ControllerPeriodicTaskTest.java | 5 +- .../PinotLLCRealtimeSegmentManagerTest.java | 4 +- 14 files changed, 225 insertions(+), 88 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java index 11ee6cf56c..4e3178fe41 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.messages; +import java.util.Map; import java.util.UUID; import org.apache.helix.ZNRecord; import org.apache.helix.model.Message; @@ -32,13 +33,15 @@ public class RunPeriodicTaskMessage extends Message { private static final String PERIODIC_TASK_REQUEST_ID = "requestId"; private static final String PERIODIC_TASK_NAME_KEY = "taskName"; private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType"; + private static final String TASK_PROPERTIES = "taskProperties"; /** * @param taskRequestId Request Id that will be appended to log messages. * @param periodicTaskName Name of the task that will be run. * @param tableNameWithType Table (names with type suffix) on which task will run. */ - public RunPeriodicTaskMessage(String taskRequestId, String periodicTaskName, String tableNameWithType) { + public RunPeriodicTaskMessage(String taskRequestId, String periodicTaskName, String tableNameWithType, + Map<String, String> taskProperties) { super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); setMsgSubType(RUN_PERIODIC_TASK_MSG_SUB_TYPE); setExecutionTimeout(-1); @@ -46,6 +49,10 @@ public class RunPeriodicTaskMessage extends Message { znRecord.setSimpleField(PERIODIC_TASK_REQUEST_ID, taskRequestId); znRecord.setSimpleField(PERIODIC_TASK_NAME_KEY, periodicTaskName); znRecord.setSimpleField(TABLE_NAME_WITH_TYPE_KEY, tableNameWithType); + + if (taskProperties != null) { + znRecord.setMapField(TASK_PROPERTIES, taskProperties); + } } public RunPeriodicTaskMessage(Message message) { @@ -63,4 +70,8 @@ public class RunPeriodicTaskMessage extends Message { public String getTableNameWithType() { return getRecord().getSimpleField(TABLE_NAME_WITH_TYPE_KEY); } + + public Map<String, String> getTaskProperties() { + return getRecord().getMapField(TASK_PROPERTIES); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java index fd9dfb843b..8adbaff9c6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller; +import java.util.Map; import java.util.Properties; import org.apache.helix.NotificationContext; import org.apache.helix.messaging.handling.HelixTaskResult; @@ -71,6 +72,7 @@ public class ControllerUserDefinedMessageHandlerFactory implements MessageHandle private final String _periodicTaskRequestId; private final String _periodicTaskName; private final String _tableNameWithType; + private final Map<String, String> _taskProperties; private final PeriodicTaskScheduler _periodicTaskScheduler; RunPeriodicTaskMessageHandler(RunPeriodicTaskMessage message, NotificationContext context, @@ -79,6 +81,7 @@ public class ControllerUserDefinedMessageHandlerFactory implements MessageHandle _periodicTaskRequestId = message.getPeriodicTaskRequestId(); _periodicTaskName = message.getPeriodicTaskName(); _tableNameWithType = message.getTableNameWithType(); + _taskProperties = message.getTaskProperties(); _periodicTaskScheduler = periodicTaskScheduler; } @@ -88,7 +91,8 @@ public class ControllerUserDefinedMessageHandlerFactory implements MessageHandle LOGGER.info("[TaskRequestId: {}] Handling RunPeriodicTaskMessage by executing task {}", _periodicTaskRequestId, _periodicTaskName); _periodicTaskScheduler - .scheduleNow(_periodicTaskName, createTaskProperties(_periodicTaskRequestId, _tableNameWithType)); + .scheduleNow(_periodicTaskName, createTaskProperties(_periodicTaskRequestId, _tableNameWithType, + _taskProperties)); HelixTaskResult helixTaskResult = new HelixTaskResult(); helixTaskResult.setSuccess(true); return helixTaskResult; @@ -99,7 +103,8 @@ public class ControllerUserDefinedMessageHandlerFactory implements MessageHandle LOGGER.error("[TaskRequestId: {}] Message handling error.", _periodicTaskRequestId, e); } - private static Properties createTaskProperties(String periodicTaskRequestId, String tableNameWithType) { + private static Properties createTaskProperties(String periodicTaskRequestId, String tableNameWithType, + Map<String, String> taskProperties) { Properties periodicTaskParameters = new Properties(); if (periodicTaskRequestId != null) { periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID, periodicTaskRequestId); @@ -109,6 +114,10 @@ public class ControllerUserDefinedMessageHandlerFactory implements MessageHandle periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_TABLE_NAME, tableNameWithType); } + if (taskProperties != null) { + taskProperties.forEach(periodicTaskParameters::setProperty); + } + return periodicTaskParameters; } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java index 109bdfc565..513b12f9d3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java @@ -51,6 +51,8 @@ public class Constants { public static final String PERIODIC_TASK_TAG = "PeriodicTask"; public static final String UPSERT_RESOURCE_TAG = "Upsert"; + public static final String REALTIME_SEGMENT_VALIDATION_MANAGER = "RealtimeSegmentValidationManager"; + public static TableType validateTableType(String tableTypeStr) { if (StringUtils.isBlank(tableTypeStr)) { return null; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java index 99a597bbc4..8566d4509c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java @@ -22,7 +22,6 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import java.util.List; -import java.util.UUID; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -31,13 +30,9 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.helix.ClusterMessagingService; -import org.apache.helix.Criteria; -import org.apache.helix.InstanceType; -import org.apache.pinot.common.messages.RunPeriodicTaskMessage; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; -import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +42,6 @@ import org.slf4j.LoggerFactory; public class PinotControllerPeriodicTaskRestletResource { private static final Logger LOGGER = LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class); - private static final String API_REQUEST_ID_PREFIX = "api-"; @Inject PinotHelixResourceManager _pinotHelixResourceManager; @@ -84,33 +78,11 @@ public class PinotControllerPeriodicTaskRestletResource { tableName = matchingTableNamesWithType.get(0); } - // Generate an id for this request by taking first eight characters of a randomly generated UUID. This request id - // is returned to the user and also appended to log messages so that user can locate all log messages associated - // with this PeriodicTask's execution. - String periodicTaskRequestId = API_REQUEST_ID_PREFIX + UUID.randomUUID().toString().substring(0, 8); + Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager + .invokeControllerPeriodicTask(tableName, periodicTaskName, null); - LOGGER.info( - "[TaskRequestId: {}] Sending periodic task message to all controllers for running task {} against {}.", - periodicTaskRequestId, periodicTaskName, tableName != null ? " table '" + tableName + "'" : "all tables"); - - // Create and send message to send to all controllers (including this one) - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setSessionSpecific(true); - recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); - recipientCriteria.setSelfExcluded(false); - RunPeriodicTaskMessage runPeriodicTaskMessage = - new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, tableName); - - ClusterMessagingService clusterMessagingService = - _pinotHelixResourceManager.getHelixZkManager().getMessagingService(); - int messageCount = clusterMessagingService.send(recipientCriteria, runPeriodicTaskMessage, null, -1); - LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", periodicTaskRequestId, - messageCount); - - return "{\"Log Request Id\": \"" + periodicTaskRequestId + "\",\"Controllers notified\":" + (messageCount > 0) - + "}"; + return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft() + + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 0) + "}"; } @GET diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java new file mode 100644 index 0000000000..c9161592a4 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.util.HashMap; +import java.util.Map; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class PinotRealtimeTableResource { + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @POST + @Path("/tables/{tableName}/resumeConsumption") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Resume the consumption of a realtime table", + notes = "Resume the consumption of a realtime table") + public String resumeConsumption( + @ApiParam(value = "Name of the table", required = true) + @PathParam("tableName") String tableName) throws JsonProcessingException { + // TODO: Add util method for invoking periodic tasks + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); + Map<String, String> taskProperties = new HashMap<>(); + taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true"); + + Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager + .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties); + + return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft() + + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 0) + "}"; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index a05dac5f9b..bf9f179925 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -95,7 +96,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh } @Override - protected Context preprocess() { + protected Context preprocess(Properties periodicTaskProperties) { Context context = new Context(); // check if we need to log disabled tables log messages long now = System.currentTimeMillis(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index cccf6d348e..dfd0510ae2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -42,6 +42,7 @@ import java.util.Set; import java.util.TimeZone; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -55,6 +56,7 @@ import javax.ws.rs.core.Response; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.AccessOption; import org.apache.helix.ClusterMessagingService; import org.apache.helix.Criteria; @@ -90,6 +92,7 @@ import org.apache.pinot.common.lineage.SegmentLineage; import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; import org.apache.pinot.common.lineage.SegmentLineageUtils; import org.apache.pinot.common.messages.RoutingTableRebuildMessage; +import org.apache.pinot.common.messages.RunPeriodicTaskMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; import org.apache.pinot.common.messages.SegmentReloadMessage; import org.apache.pinot.common.messages.TableConfigRefreshMessage; @@ -165,6 +168,7 @@ public class PinotHelixResourceManager { private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f); public static final String APPEND = "APPEND"; private static final int DEFAULT_TABLE_UPDATER_LOCKERS_SIZE = 100; + private static final String API_REQUEST_ID_PREFIX = "api-"; // TODO: make this configurable public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 60_000L; // 10 minutes @@ -3560,6 +3564,40 @@ public class PinotHelixResourceManager { return tableConfig.getValidationConfig().getReplicationNumber(); } + /** + * Trigger controller periodic task using helix messaging service + * @param tableName Name of table against which task is to be run + * @param periodicTaskName Task name + * @param taskProperties Extra properties to be passed along + * @return Task id for filtering logs, along with the number of successfully sent messages + */ + public Pair<String, Integer> invokeControllerPeriodicTask(String tableName, String periodicTaskName, + Map<String, String> taskProperties) { + String periodicTaskRequestId = API_REQUEST_ID_PREFIX + UUID.randomUUID().toString().substring(0, 8); + + LOGGER.info( + "[TaskRequestId: {}] Sending periodic task message to all controllers for running task {} against {}," + + " with properties {}.\"", periodicTaskRequestId, periodicTaskName, + tableName != null ? " table '" + tableName + "'" : "all tables", taskProperties); + + // Create and send message to send to all controllers (including this one) + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setInstanceName("%"); + recipientCriteria.setSessionSpecific(true); + recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); + recipientCriteria.setSelfExcluded(false); + RunPeriodicTaskMessage runPeriodicTaskMessage = + new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, tableName, taskProperties); + + ClusterMessagingService clusterMessagingService = getHelixZkManager().getMessagingService(); + int messageCount = clusterMessagingService.send(recipientCriteria, runPeriodicTaskMessage, null, -1); + + LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", periodicTaskRequestId, + messageCount); + return Pair.of(periodicTaskRequestId, messageCount); + } + /* * Uncomment and use for testing on a real cluster public static void main(String[] args) throws Exception { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 522cc5e077..d7c73b7ac1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -587,7 +588,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } @Override - protected void processTables(List<String> tableNamesWithType) { + protected void processTables(List<String> tableNamesWithType, Properties taskProperties) { scheduleTasks(tableNamesWithType, true); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java index 439f8be45c..0e737b2e16 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java @@ -62,7 +62,6 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { try { // Check if we have a specific table against which this task needs to be run. String propTableNameWithType = (String) periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME); - // Process the tables that are managed by this controller List<String> tablesToProcess = new ArrayList<>(); List<String> nonLeaderForTables = new ArrayList<>(); @@ -83,7 +82,7 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { } if (!tablesToProcess.isEmpty()) { - processTables(tablesToProcess); + processTables(tablesToProcess, periodicTaskProperties); } if (!nonLeaderForTables.isEmpty()) { nonLeaderCleanup(nonLeaderForTables); @@ -103,10 +102,10 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { * <p> * Override one of this method, {@link #processTable(String)} or {@link #processTable(String, C)}. */ - protected void processTables(List<String> tableNamesWithType) { + protected void processTables(List<String> tableNamesWithType, Properties periodicTaskProperties) { int numTables = tableNamesWithType.size(); LOGGER.info("Processing {} tables in task: {}", numTables, _taskName); - C context = preprocess(); + C context = preprocess(periodicTaskProperties); int numTablesProcessed = 0; for (String tableNameWithType : tableNamesWithType) { if (!isStarted()) { @@ -129,14 +128,14 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { /** * Can be overridden to provide context before processing the tables. */ - protected C preprocess() { + protected C preprocess(Properties periodicTaskProperties) { return null; } /** * Processes the given table. * <p> - * Override one of this method, {@link #processTable(String)} or {@link #processTables(List)}. + * Override one of this method, {@link #processTable(String)} or {@link #processTables(List, Properties)}. */ protected void processTable(String tableNameWithType, C context) { processTable(tableNameWithType); @@ -145,7 +144,7 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { /** * Processes the given table. * <p> - * Override one of this method, {@link #processTable(String, C)} or {@link #processTables(List)}. + * Override one of this method, {@link #processTable(String, C)} or {@link #processTables(List, Properties)}. */ protected void processTable(String tableNameWithType) { } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 2d07e20be0..1a7428687b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -859,10 +859,18 @@ public class PinotLLCRealtimeSegmentManager { * If the controller fails before step-1, the server will see this as an upload failure, and will re-try. * @param tableConfig * + * If the consuming segment is deleted by user intentionally or by mistake: + * Check whether there are segments in the PROPERTYSTORE with status DONE, but no new segment in status + * IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is ONLINE. + * If so, it should create a new CONSUMING segment for the partition. + * (this operation is done only if @param recreateDeletedConsumingSegment is set to true, + * which means it's manually triggered by admin not by automatic periodic task) + * * TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and * reset it to 0 at the end of validateLLC */ - public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig) { + public void ensureAllPartitionsConsuming(TableConfig tableConfig, + PartitionLevelStreamConfig streamConfig, boolean recreateDeletedConsumingSegment) { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); @@ -877,7 +885,8 @@ public class PinotLLCRealtimeSegmentManager { List<PartitionGroupMetadata> newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); streamConfig.setOffsetCriteria(originalOffsetCriteria); - return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList); + return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, + recreateDeletedConsumingSegment); } else { LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName); return idealState; @@ -1032,7 +1041,8 @@ public class PinotLLCRealtimeSegmentManager { */ @VisibleForTesting IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, - IdealState idealState, List<PartitionGroupMetadata> newPartitionGroupMetadataList) { + IdealState idealState, List<PartitionGroupMetadata> newPartitionGroupMetadataList, + boolean recreateDeletedConsumingSegment) { String realtimeTableName = tableConfig.getTableName(); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); @@ -1061,10 +1071,13 @@ public class PinotLLCRealtimeSegmentManager { // b. update current segment in idealstate to ONLINE (only if partition is present in newPartitionGroupMetadata) // c. add new segment in idealstate to CONSUMING on the hosts (only if partition is present in // newPartitionGroupMetadata) - // 2. The latest metadata is IN_PROGRESS, but segment is not there in idealstate. + // 2. The latest metadata is in DONE state, but the idealstate has no segment in CONSUMING state. + // a. Create metadata for new IN_PROGRESS segment with startOffset set to latest segments' end offset. + // b. Add the newly created segment to idealstate with segment state set to CONSUMING. + // 3. The latest metadata is IN_PROGRESS, but segment is not there in idealstate. // a. change prev segment to ONLINE in idealstate // b. add latest segment to CONSUMING in idealstate. - // 3. All instances of a segment are in OFFLINE state. + // 4. All instances of a segment are in OFFLINE state. // a. Create a new segment (with the next seq number) // and restart consumption from the same offset (if possible) or a newer offset (if realtime stream does // not have the same offset). @@ -1117,43 +1130,29 @@ public class PinotLLCRealtimeSegmentManager { // CONSUMING segment // 1. all replicas OFFLINE and metadata IN_PROGRESS/DONE - a segment marked itself OFFLINE during // consumption for some reason - // 2. all replicas ONLINE and metadata DONE - Resolved in https://github.com/linkedin/pinot/pull/2890 + // 2. all replicas ONLINE and metadata DONE // 3. we should never end up with some replicas ONLINE and some OFFLINE. if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) { LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName); - - // Create a new segment to re-consume from the previous start offset - LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset()); - StreamPartitionMsgOffset partitionGroupSmallestOffset = - getPartitionGroupSmallestOffset(streamConfig, partitionGroupId); - - // Start offset must be higher than the start offset of the stream - if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) { - LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset, - partitionGroupSmallestOffset, partitionGroupId, realtimeTableName); - _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); - startOffset = partitionGroupSmallestOffset; - } - - CommittingSegmentDescriptor committingSegmentDescriptor = - new CommittingSegmentDescriptor(latestSegmentName, startOffset.toString(), 0); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, - committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas, - newPartitionGroupMetadataList); - String newSegmentName = newLLCSegmentName.getSegmentName(); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, - instancePartitionsMap); + createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, + partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, + segmentAssignment, instancePartitionsMap, startOffset); } else { if (newPartitionGroupSet.contains(partitionGroupId)) { - // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has - // replicas - // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In - // that case, we need to either extend this part to handle the state, or prevent segments from getting - // into - // such state. - LOGGER - .error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName); + if (recreateDeletedConsumingSegment && Status.DONE.equals(latestSegmentZKMetadata.getStatus()) + && isAllInstancesInState(instanceStateMap, SegmentStateModel.ONLINE)) { + // If we get here, that means in IdealState, the latest segment has all replicas ONLINE. + // Create a new IN_PROGRESS segment in PROPERTYSTORE, + // add it as CONSUMING segment to IDEALSTATE. + StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getEndOffset()); + createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, + partitionGroupId, newPartitionGroupMetadataList, instancePartitions, + instanceStatesMap, segmentAssignment, instancePartitionsMap, startOffset); + } else { + LOGGER.error("Got unexpected instance state map: {} for segment: {}", + instanceStateMap, latestSegmentName); + } } // else, the partition group has reached end of life. This is an acceptable state } @@ -1211,6 +1210,36 @@ public class PinotLLCRealtimeSegmentManager { return idealState; } + private void createNewConsumingSegment(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, + SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, int partitionGroupId, + List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions, + Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) { + int numReplicas = getNumReplicas(tableConfig, instancePartitions); + int numPartitions = newPartitionGroupMetadataList.size(); + LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentZKMetadata.getSegmentName()); + LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); + StreamPartitionMsgOffset partitionGroupSmallestOffset = + getPartitionGroupSmallestOffset(streamConfig, partitionGroupId); + + // Start offset must be higher than the start offset of the stream + if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) { + LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset, + partitionGroupSmallestOffset, partitionGroupId, tableConfig.getTableName()); + _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); + startOffset = partitionGroupSmallestOffset; + } + + CommittingSegmentDescriptor committingSegmentDescriptor = + new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0); + createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, + committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, + numReplicas, newPartitionGroupMetadataList); + String newSegmentName = newLLCSegmentName.getSegmentName(); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, + instancePartitionsMap); + } + private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) { OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java index 2422b1ef1f..858b3cae46 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.validation; import java.util.List; +import java.util.Properties; import java.util.Set; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -45,7 +46,7 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask<Brok } @Override - protected Context preprocess() { + protected Context preprocess(Properties periodicTaskProperties) { Context context = new Context(); context._instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs(); return context; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 237924add0..5831c0617a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.validation; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -55,6 +56,8 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea private final int _segmentLevelValidationIntervalInSeconds; private long _lastSegmentLevelValidationRunTimeMs = 0L; + public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = "recreateDeletedConsumingSegment"; + public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) { @@ -69,7 +72,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea } @Override - protected Context preprocess() { + protected Context preprocess(Properties periodicTaskProperties) { Context context = new Context(); // Run segment level validation only if certain time has passed after previous run long currentTimeMs = System.currentTimeMillis(); @@ -79,6 +82,9 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea context._runSegmentLevelValidation = true; _lastSegmentLevelValidationRunTimeMs = currentTimeMs; } + context._recreateDeletedConsumingSegment = + Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY)); + return context; } @@ -100,7 +106,8 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); if (streamConfig.hasLowLevelConsumerType()) { - _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig); + _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, + streamConfig, context._recreateDeletedConsumingSegment); } } } @@ -174,6 +181,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea public static final class Context { private boolean _runSegmentLevelValidation; + private boolean _recreateDeletedConsumingSegment; } @VisibleForTesting diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java index 6d66b2bdab..23a5dada97 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.periodictask; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; @@ -70,9 +71,9 @@ public class ControllerPeriodicTaskTest { } @Override - public void processTables(List<String> tableNamesWithType) { + public void processTables(List<String> tableNamesWithType, Properties periodicTaskProperties) { _processTablesCalled.set(true); - super.processTables(tableNamesWithType); + super.processTables(tableNamesWithType, periodicTaskProperties); } @Override diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index d7dc14027b..4f7acf3773 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -888,7 +888,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // Expected } try { - segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig); + segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false); fail(); } catch (IllegalStateException e) { // Expected @@ -1115,7 +1115,7 @@ public class PinotLLCRealtimeSegmentManagerTest { public void ensureAllPartitionsConsuming() { ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, - getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList())); + getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org