npawar commented on code in PR #8663: URL: https://github.com/apache/pinot/pull/8663#discussion_r871898206
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe } } } + + /** + * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to resume realtime table consumption + * @param tableName + */ + public void resumeRealtimeTableConsumption(String tableName) { + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName, TableType.REALTIME); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); Review Comment: tableConfig.getTableName will always give you tableNameWithType ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe } } } + + /** + * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to resume realtime table consumption + * @param tableName + */ + public void resumeRealtimeTableConsumption(String tableName) { + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName, TableType.REALTIME); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); + + InstancePartitions instancePartitions = + InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(), tableConfig, + InstancePartitionsType.CONSUMING); + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); + IdealState idealState = + HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(), realtimeTableName); + int numReplicas = getNumReplicas(tableConfig, instancePartitions); + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfig); + // Read the smallest offset when a new partition is detected + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + streamConfig.setOffsetCriteria(originalOffsetCriteria); + + int numPartitionGroups = newPartitionGroupMetadataList.size(); + Map<Integer, SegmentZKMetadata> partitionToLatestSegment = getLatestSegmentZKMetadataMap(realtimeTableName); + for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { Review Comment: are you intentionally including all new partition groups, instead of just the ones that were deleted and user wants to restore? I don't think it's a good idea to introduce yet another place where all new partition groups are read to start consumption (it already happens in ensureAllPartitionsConsuming) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe } } } + + /** + * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to resume realtime table consumption + * @param tableName + */ + public void resumeRealtimeTableConsumption(String tableName) { + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName, TableType.REALTIME); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); + + InstancePartitions instancePartitions = + InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(), tableConfig, + InstancePartitionsType.CONSUMING); + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); + IdealState idealState = + HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(), realtimeTableName); + int numReplicas = getNumReplicas(tableConfig, instancePartitions); + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfig); + // Read the smallest offset when a new partition is detected + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + streamConfig.setOffsetCriteria(originalOffsetCriteria); + + int numPartitionGroups = newPartitionGroupMetadataList.size(); + Map<Integer, SegmentZKMetadata> partitionToLatestSegment = getLatestSegmentZKMetadataMap(realtimeTableName); + for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { + int partitionId = partitionGroupMetadata.getPartitionGroupId(); + if (partitionToLatestSegment.containsKey(partitionId) + && idealState.getInstanceStateMap(partitionToLatestSegment.get(partitionId).getSegmentName()) + .containsValue(SegmentStateModel.CONSUMING)) { + // IN_PROGRESS segment already exists, NO-OP + LOGGER.info("Skipping partitionGroupId {}, IN_PROGRESS segment already exists", partitionId); + } else { + // Step 1: Create PROPERTYSTORE nodes for each partitionId (IN_PROGRESS) + String offset = partitionGroupMetadata.getStartOffset().toString(); + long newSegmentCreationTimeMs = System.currentTimeMillis(); + + SegmentZKMetadata latestSegmentMeta = partitionToLatestSegment.get(partitionId); + int seqNumber = latestSegmentMeta == null ? STARTING_SEQUENCE_NUMBER + : new LLCSegmentName(latestSegmentMeta.getSegmentName()).getSequenceNumber() + 1; + + LLCSegmentName newLLCSegment = new LLCSegmentName(tableName, partitionId, seqNumber, newSegmentCreationTimeMs); + SegmentZKMetadata newSegmentZKMetadata = new SegmentZKMetadata(newLLCSegment.getSegmentName()); + newSegmentZKMetadata.setCreationTime(newLLCSegment.getCreationTimeMs()); + newSegmentZKMetadata.setStartOffset(offset); + newSegmentZKMetadata.setNumReplicas(numReplicas); + newSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); + SegmentPartitionMetadata segmentPartitionMetadata = + getPartitionMetadataFromTableConfig(tableConfig, partitionId); + if (segmentPartitionMetadata != null) { + newSegmentZKMetadata.setPartitionMetadata(segmentPartitionMetadata); + } + + FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager + .getFlushThresholdUpdater(streamConfig); + CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, offset, 0); + flushThresholdUpdater.updateFlushThreshold(streamConfig, + newSegmentZKMetadata, committingSegmentDescriptor, null, + getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas), + newPartitionGroupMetadataList); + + _helixResourceManager.getPropertyStore().set( Review Comment: we have to ensure that this change plays nicely with the code in ensureAllPartitionsConsuming. That method also creates new zkMetadata and idealState entry, and these 2 should not interfere with each other. You'll have to add some synchronization on table name between the 2 methods ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe } } } + + /** + * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to resume realtime table consumption + * @param tableName + */ + public void resumeRealtimeTableConsumption(String tableName) { + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName, TableType.REALTIME); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); + + InstancePartitions instancePartitions = + InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(), tableConfig, + InstancePartitionsType.CONSUMING); + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); + IdealState idealState = + HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(), realtimeTableName); + int numReplicas = getNumReplicas(tableConfig, instancePartitions); + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfig); + // Read the smallest offset when a new partition is detected + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); Review Comment: for the partitions that we will restore, we should not start reading from smallest offset. we should be using end offset of previous completed segment if available, and use smallest only if we dont have the previous segment (segment __0 itself was deleted) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org