saurabhd336 commented on code in PR #8663: URL: https://github.com/apache/pinot/pull/8663#discussion_r876588100
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe } } } + + /** + * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to resume realtime table consumption + * @param tableName + */ + public void resumeRealtimeTableConsumption(String tableName) { + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName, TableType.REALTIME); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); + + InstancePartitions instancePartitions = + InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(), tableConfig, + InstancePartitionsType.CONSUMING); + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); + IdealState idealState = + HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(), realtimeTableName); + int numReplicas = getNumReplicas(tableConfig, instancePartitions); + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfig); + // Read the smallest offset when a new partition is detected + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + streamConfig.setOffsetCriteria(originalOffsetCriteria); + + int numPartitionGroups = newPartitionGroupMetadataList.size(); + Map<Integer, SegmentZKMetadata> partitionToLatestSegment = getLatestSegmentZKMetadataMap(realtimeTableName); + for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { + int partitionId = partitionGroupMetadata.getPartitionGroupId(); + if (partitionToLatestSegment.containsKey(partitionId) + && idealState.getInstanceStateMap(partitionToLatestSegment.get(partitionId).getSegmentName()) + .containsValue(SegmentStateModel.CONSUMING)) { + // IN_PROGRESS segment already exists, NO-OP + LOGGER.info("Skipping partitionGroupId {}, IN_PROGRESS segment already exists", partitionId); + } else { + // Step 1: Create PROPERTYSTORE nodes for each partitionId (IN_PROGRESS) + String offset = partitionGroupMetadata.getStartOffset().toString(); + long newSegmentCreationTimeMs = System.currentTimeMillis(); + + SegmentZKMetadata latestSegmentMeta = partitionToLatestSegment.get(partitionId); + int seqNumber = latestSegmentMeta == null ? STARTING_SEQUENCE_NUMBER + : new LLCSegmentName(latestSegmentMeta.getSegmentName()).getSequenceNumber() + 1; + + LLCSegmentName newLLCSegment = new LLCSegmentName(tableName, partitionId, seqNumber, newSegmentCreationTimeMs); + SegmentZKMetadata newSegmentZKMetadata = new SegmentZKMetadata(newLLCSegment.getSegmentName()); + newSegmentZKMetadata.setCreationTime(newLLCSegment.getCreationTimeMs()); + newSegmentZKMetadata.setStartOffset(offset); + newSegmentZKMetadata.setNumReplicas(numReplicas); + newSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); + SegmentPartitionMetadata segmentPartitionMetadata = + getPartitionMetadataFromTableConfig(tableConfig, partitionId); + if (segmentPartitionMetadata != null) { + newSegmentZKMetadata.setPartitionMetadata(segmentPartitionMetadata); + } + + FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager + .getFlushThresholdUpdater(streamConfig); + CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, offset, 0); + flushThresholdUpdater.updateFlushThreshold(streamConfig, + newSegmentZKMetadata, committingSegmentDescriptor, null, + getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas), + newPartitionGroupMetadataList); + + _helixResourceManager.getPropertyStore().set( Review Comment: I was trying to go through helix's `updateIdealState` function, which gets called by `ensureAllPartitionsConsuming`, and looks like the idealState update has support for handling concurrent writes (CAS like implementation). So we may not need syncronization afterall? Even if two threads / processes try to concurrently update the state, it'll get serialized and hence will be safe. CC: @Jackie-Jiang for more inputs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org