Copilot commented on code in PR #15957: URL: https://github.com/apache/pinot/pull/15957#discussion_r2131633374
########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java: ########## @@ -128,19 +128,20 @@ public static int getStreamConfigIndexFromPinotPartitionId(int partitionId) { * Fetches the streamConfig from the list of streamConfigs according to the partition id. */ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig, int partitionId) { Review Comment: Add a check that tableConfig.getTableType() == TableType.REALTIME before using streamConfigMaps to prevent accidental use for OFFLINE tables. ```suggestion public static Map<String, String> getStreamConfigMap(TableConfig tableConfig, int partitionId) { Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, "StreamConfig is only applicable for REALTIME tables. Table: %s is of type: %s", tableConfig.getTableName(), tableConfig.getTableType()); ``` ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java: ########## @@ -69,39 +65,79 @@ public Exception getException() { public Boolean call() throws Exception { _newPartitionGroupMetadataList.clear(); - for (int i = 0; i < _streamConfigs.size(); i++) { - String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" - + _streamConfigs.get(i).getTableNameWithType() + "-" + _topicNames.get(i); - StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfigs.get(i)); - final int index = i; + return _streamConfigs.size() == 1 ? fetchSingleStream() : fetchMultipleStreams(); + } + + private Boolean fetchSingleStream() + throws Exception { + StreamConfig streamConfig = _streamConfigs.get(0); + String topicName = streamConfig.getTopicName(); + String clientId = + PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" + + topicName; + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( + StreamConsumerFactory.getUniqueClientId(clientId))) { + _newPartitionGroupMetadataList.addAll( + streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId), Review Comment: [nitpick] The unique client ID is generated twice in this call and in the provider creation; extract it to a local variable to avoid inconsistent IDs and improve readability. ```suggestion String uniqueClientId = StreamConsumerFactory.getUniqueClientId(clientId); StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( uniqueClientId)) { _newPartitionGroupMetadataList.addAll( streamMetadataProvider.computePartitionGroupMetadata(uniqueClientId, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org