Copilot commented on code in PR #15957:
URL: https://github.com/apache/pinot/pull/15957#discussion_r2131633374


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -128,19 +128,20 @@ public static int 
getStreamConfigIndexFromPinotPartitionId(int partitionId) {
    * Fetches the streamConfig from the list of streamConfigs according to the 
partition id.
    */
   public static Map<String, String> getStreamConfigMap(TableConfig 
tableConfig, int partitionId) {

Review Comment:
   Add a check that tableConfig.getTableType() == TableType.REALTIME before 
using streamConfigMaps to prevent accidental use for OFFLINE tables.
   ```suggestion
     public static Map<String, String> getStreamConfigMap(TableConfig 
tableConfig, int partitionId) {
       Preconditions.checkState(tableConfig.getTableType() == 
TableType.REALTIME,
           "StreamConfig is only applicable for REALTIME tables. Table: %s is 
of type: %s",
           tableConfig.getTableName(), tableConfig.getTableType());
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -69,39 +65,79 @@ public Exception getException() {
   public Boolean call()
       throws Exception {
     _newPartitionGroupMetadataList.clear();
-    for (int i = 0; i < _streamConfigs.size(); i++) {
-      String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + 
"-"
-          + _streamConfigs.get(i).getTableNameWithType() + "-" + 
_topicNames.get(i);
-      StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfigs.get(i));
-      final int index = i;
+    return _streamConfigs.size() == 1 ? fetchSingleStream() : 
fetchMultipleStreams();
+  }
+
+  private Boolean fetchSingleStream()
+      throws Exception {
+    StreamConfig streamConfig = _streamConfigs.get(0);
+    String topicName = streamConfig.getTopicName();
+    String clientId =
+        PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + 
streamConfig.getTableNameWithType() + "-"
+            + topicName;
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
+        StreamConsumerFactory.getUniqueClientId(clientId))) {
+      _newPartitionGroupMetadataList.addAll(
+          
streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId),

Review Comment:
   [nitpick] The unique client ID is generated twice in this call and in the 
provider creation; extract it to a local variable to avoid inconsistent IDs and 
improve readability.
   ```suggestion
       String uniqueClientId = 
StreamConsumerFactory.getUniqueClientId(clientId);
       StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
       try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
           uniqueClientId)) {
         _newPartitionGroupMetadataList.addAll(
             
streamMetadataProvider.computePartitionGroupMetadata(uniqueClientId,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to