npawar commented on code in PR #8663: URL: https://github.com/apache/pinot/pull/8663#discussion_r872475672
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe } } } + + /** + * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to resume realtime table consumption + * @param tableName + */ + public void resumeRealtimeTableConsumption(String tableName) { + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName, TableType.REALTIME); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); + + InstancePartitions instancePartitions = + InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(), tableConfig, + InstancePartitionsType.CONSUMING); + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); + IdealState idealState = + HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(), realtimeTableName); + int numReplicas = getNumReplicas(tableConfig, instancePartitions); + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfig); + // Read the smallest offset when a new partition is detected + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + streamConfig.setOffsetCriteria(originalOffsetCriteria); + + int numPartitionGroups = newPartitionGroupMetadataList.size(); + Map<Integer, SegmentZKMetadata> partitionToLatestSegment = getLatestSegmentZKMetadataMap(realtimeTableName); + for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { Review Comment: I see. But it's not going to be a generic resume consumption Api unless all the other cases from ensureAllPartitions are also included. Now we have two ways to resume consumption, both doing only partial cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org