This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a273af0262 Add capabilities to ingest from another stream without disabling the realtime table (#9289) a273af0262 is described below commit a273af026221be6a0d1eb6268f091429ce3fac39 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Mon Aug 29 17:44:51 2022 -0700 Add capabilities to ingest from another stream without disabling the realtime table (#9289) --- .../api/resources/PinotRealtimeTableResource.java | 20 ++-- .../realtime/PinotLLCRealtimeSegmentManager.java | 113 +++++++++++++-------- .../RealtimeSegmentValidationManager.java | 10 +- .../PinotLLCRealtimeSegmentManagerTest.java | 5 +- 4 files changed, 98 insertions(+), 50 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index 7bf4ac4235..715d65bede 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -31,6 +31,7 @@ import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -61,8 +62,7 @@ public class PinotRealtimeTableResource { @POST @Path("/tables/{tableName}/pauseConsumption") @Produces(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Pause consumption of a realtime table", - notes = "Pause the consumption of a realtime table") + @ApiOperation(value = "Pause consumption of a realtime table", notes = "Pause the consumption of a realtime table") public Response pauseConsumption( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); @@ -77,14 +77,22 @@ public class PinotRealtimeTableResource { @POST @Path("/tables/{tableName}/resumeConsumption") @Produces(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Resume consumption of a realtime table", - notes = "Resume the consumption for a realtime table") + @ApiOperation(value = "Resume consumption of a realtime table", notes = + "Resume the consumption for a realtime table. ConsumeFrom parameter indicates from which offsets " + + "consumption should resume. If consumeFrom parameter is not provided, consumption continues based on the " + + "offsets in segment ZK metadata, and in case the offsets are already gone, the first available offsets are " + + "picked to minimize the data loss.") public Response resumeConsumption( - @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "smallest | largest") @QueryParam("consumeFrom") String consumeFrom) { String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); validate(tableNameWithType); + if (consumeFrom != null && !consumeFrom.equalsIgnoreCase("smallest") && !consumeFrom.equalsIgnoreCase("largest")) { + throw new ControllerApplicationException(LOGGER, + String.format("consumeFrom param '%s' is not valid.", consumeFrom), Response.Status.BAD_REQUEST); + } try { - return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build(); + return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType, consumeFrom)).build(); } catch (Exception e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 5cec6b8c4b..762ffdc421 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -873,7 +873,7 @@ public class PinotLLCRealtimeSegmentManager { * which means it's manually triggered by admin not by automatic periodic task) */ public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, - boolean recreateDeletedConsumingSegment) { + boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); @@ -881,17 +881,20 @@ public class PinotLLCRealtimeSegmentManager { assert idealState != null; boolean isTableEnabled = idealState.isEnabled(); boolean isTablePaused = isTablePaused(idealState); + boolean offsetsHaveToChange = offsetCriteria != null; if (isTableEnabled && !isTablePaused) { List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = - getPartitionGroupConsumptionStatusList(idealState, streamConfig); - // Read the smallest offset when a new partition is detected + offsetsHaveToChange + ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions + : getPartitionGroupConsumptionStatusList(idealState, streamConfig); OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); - streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + // Read the smallest offset when a new partition is detected + streamConfig.setOffsetCriteria(offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA); List<PartitionGroupMetadata> newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); streamConfig.setOffsetCriteria(originalOffsetCriteria); return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, - recreateDeletedConsumingSegment); + recreateDeletedConsumingSegment, offsetCriteria); } else { LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", realtimeTableName, isTableEnabled, isTablePaused); @@ -1052,7 +1055,7 @@ public class PinotLLCRealtimeSegmentManager { @VisibleForTesting IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, IdealState idealState, List<PartitionGroupMetadata> newPartitionGroupMetadataList, - boolean recreateDeletedConsumingSegment) { + boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) { String realtimeTableName = tableConfig.getTableName(); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); @@ -1074,6 +1077,16 @@ public class PinotLLCRealtimeSegmentManager { // Get the latest segment ZK metadata for each partition Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName); + // create a map of <parition id, start offset> using data already fetched in newPartitionGroupMetadataList + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new HashMap<>(); + for (PartitionGroupMetadata metadata : newPartitionGroupMetadataList) { + partitionGroupIdToStartOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + } + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset = null; + if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) { + partitionGroupIdToSmallestStreamOffset = partitionGroupIdToStartOffset; + } + // Walk over all partitions that we have metadata for, and repair any partitions necessary. // Possible things to repair: // 1. The latest metadata is in DONE state, but the idealstate says segment is CONSUMING: @@ -1144,10 +1157,18 @@ public class PinotLLCRealtimeSegmentManager { // 3. we should never end up with some replicas ONLINE and some OFFLINE. if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) { LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName); - StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset()); + if (partitionGroupIdToSmallestStreamOffset == null) { + // Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset. + // This is to prevent fetching the same info for each and every partition group. + partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig); + } + StreamPartitionMsgOffset startOffset = + selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset, + partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, + latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, - partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, - segmentAssignment, instancePartitionsMap, startOffset); + newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, + instancePartitionsMap, startOffset); } else { if (newPartitionGroupSet.contains(partitionGroupId)) { if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted() @@ -1155,10 +1176,18 @@ public class PinotLLCRealtimeSegmentManager { // If we get here, that means in IdealState, the latest segment has all replicas ONLINE. // Create a new IN_PROGRESS segment in PROPERTYSTORE, // add it as CONSUMING segment to IDEALSTATE. - StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getEndOffset()); + if (partitionGroupIdToSmallestStreamOffset == null) { + // Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset. + // This is to prevent fetching the same info for each and every partition group. + partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig); + } + StreamPartitionMsgOffset startOffset = + selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset, + partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, + latestSegmentZKMetadata.getEndOffset()); createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, - partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, - segmentAssignment, instancePartitionsMap, startOffset); + newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, + instancePartitionsMap, startOffset); } else { LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName); @@ -1220,7 +1249,7 @@ public class PinotLLCRealtimeSegmentManager { } private void createNewConsumingSegment(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, - SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, int partitionGroupId, + SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions, Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) { @@ -1228,23 +1257,6 @@ public class PinotLLCRealtimeSegmentManager { int numPartitions = newPartitionGroupMetadataList.size(); LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentZKMetadata.getSegmentName()); LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); - StreamPartitionMsgOffset partitionGroupSmallestOffset = - getPartitionGroupSmallestOffset(streamConfig, partitionGroupId); - - if (partitionGroupSmallestOffset != null) { - // Start offset must be higher than the start offset of the stream - if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) { - LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset, - partitionGroupSmallestOffset, partitionGroupId, tableConfig.getTableName()); - _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); - startOffset = partitionGroupSmallestOffset; - } - } else { - LOGGER.error("Smallest offset for partition: {} of table: {} not found. Using startOffset: {}", partitionGroupId, - tableConfig.getTableName(), startOffset); - _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); - } - CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, @@ -1254,21 +1266,39 @@ public class PinotLLCRealtimeSegmentManager { instancePartitionsMap); } - @Nullable - private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) { + private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) { OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); - List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata = + List<PartitionGroupMetadata> partitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); streamConfig.setOffsetCriteria(originalOffsetCriteria); - StreamPartitionMsgOffset partitionStartOffset = null; - for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) { - if (info.getPartitionGroupId() == partitionGroupId) { - partitionStartOffset = info.getStartOffset(); - break; + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>(); + for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { + partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + } + return partitionGroupIdToSmallestOffset; + } + + private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria, int partitionGroupId, + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset, + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset, String tableName, + StreamPartitionMsgOffsetFactory offsetFactory, String startOffsetInSegmentZkMetadataStr) { + if (offsetCriteria != null) { + // use the fetched offset according to offset criteria + return partitionGroupIdToStartOffset.get(partitionGroupId); + } else { + // use offset from segment ZK metadata + StreamPartitionMsgOffset startOffsetInSegmentZkMetadata = offsetFactory.create(startOffsetInSegmentZkMetadataStr); + StreamPartitionMsgOffset streamSmallestOffset = partitionGroupIdToSmallestStreamOffset.get(partitionGroupId); + // Start offset in ZK must be higher than the start offset of the stream + if (streamSmallestOffset.compareTo(startOffsetInSegmentZkMetadata) > 0) { + LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffsetInSegmentZkMetadata, + streamSmallestOffset, partitionGroupId, tableName); + _controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); + return streamSmallestOffset; } + return startOffsetInSegmentZkMetadata; } - return partitionStartOffset; } private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) { @@ -1448,12 +1478,15 @@ public class PinotLLCRealtimeSegmentManager { * 1) setting "isTablePaused" in ideal states to false and * 2) triggering segment validation job to create new consuming segments in ideal states */ - public PauseStatus resumeConsumption(String tableNameWithType) { + public PauseStatus resumeConsumption(String tableNameWithType, @Nullable String offsetCriteria) { IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, false); // trigger realtime segment validation job to resume consumption Map<String, String> taskProperties = new HashMap<>(); taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true"); + if (offsetCriteria != null) { + taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, offsetCriteria); + } _helixResourceManager .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index b772fcabd1..90c23361d1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -34,6 +34,7 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -55,6 +56,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea private long _lastSegmentLevelValidationRunTimeMs = 0L; public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = "recreateDeletedConsumingSegment"; + public static final String OFFSET_CRITERIA = "offsetCriteria"; public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, @@ -82,7 +84,10 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea } context._recreateDeletedConsumingSegment = Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY)); - + String offsetCriteriaStr = periodicTaskProperties.getProperty(OFFSET_CRITERIA); + if (offsetCriteriaStr != null) { + context._offsetCriteria = new OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr); + } return context; } @@ -106,7 +111,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea if (streamConfig.hasLowLevelConsumerType()) { _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig, - context._recreateDeletedConsumingSegment); + context._recreateDeletedConsumingSegment, context._offsetCriteria); } } @@ -178,6 +183,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea public static final class Context { private boolean _runSegmentLevelValidation; private boolean _recreateDeletedConsumingSegment; + private OffsetCriteria _offsetCriteria; } @VisibleForTesting diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 14ee8970ec..eee1f2a8a8 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -888,7 +888,8 @@ public class PinotLLCRealtimeSegmentManagerTest { // Expected } try { - segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false); + segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false, + null); fail(); } catch (IllegalStateException e) { // Expected @@ -1115,7 +1116,7 @@ public class PinotLLCRealtimeSegmentManagerTest { public void ensureAllPartitionsConsuming() { ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, - getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false); + getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false, null); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org