This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe72e939e2 Make clientId to be unique for 
PartitionGroupMetadataFetcher (#15393)
fe72e939e2 is described below

commit fe72e939e2230049a2b11fb199f69a313a5be7a6
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Mon Mar 31 23:40:13 2025 -0700

    Make clientId to be unique for PartitionGroupMetadataFetcher (#15393)
---
 .../core/realtime/PinotLLCRealtimeSegmentManager.java |  4 ++--
 .../realtime/RealtimeConsumptionRateManager.java      |  3 ++-
 .../spi/stream/PartitionGroupMetadataFetcher.java     | 19 ++++++++++---------
 .../pinot/spi/stream/StreamConsumerFactory.java       | 10 ++++++++++
 .../pinot/spi/stream/StreamMetadataProvider.java      |  2 +-
 5 files changed, 25 insertions(+), 13 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9e8bff5f5d..7d7393a06c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -998,9 +998,9 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   Set<Integer> getPartitionIds(StreamConfig streamConfig)
       throws Exception {
-    String clientId =
+    String clientId = StreamConsumerFactory.getUniqueClientId(
         PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + 
streamConfig.getTableNameWithType() + "-"
-            + streamConfig.getTopicName();
+            + streamConfig.getTopicName());
     StreamConsumerFactory consumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     try (StreamMetadataProvider metadataProvider = 
consumerFactory.createStreamMetadataProvider(clientId)) {
       return metadataProvider.fetchPartitionIds(5000L);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
index 0d59899c9e..153c22fd85 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
@@ -201,7 +201,8 @@ public class RealtimeConsumptionRateManager {
 
   @VisibleForTesting
   static final PartitionCountFetcher DEFAULT_PARTITION_COUNT_FETCHER = 
streamConfig -> {
-    String clientId = streamConfig.getTopicName() + 
"-consumption.rate.manager";
+    String clientId =
+        StreamConsumerFactory.getUniqueClientId(streamConfig.getTopicName() + 
"-consumption.rate.manager");
     StreamConsumerFactory factory = 
StreamConsumerFactoryProvider.create(streamConfig);
     try (StreamMetadataProvider streamMetadataProvider = 
factory.createStreamMetadataProvider(clientId)) {
       return 
streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs*/10_000);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 158e28ce72..30cbe8bd63 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -78,15 +78,16 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
                       partitionGroupConsumptionStatus.getPartitionGroupId()) 
== index)
               .collect(Collectors.toList());
       try (
-          StreamMetadataProvider streamMetadataProvider =
-              streamConsumerFactory.createStreamMetadataProvider(clientId)) {
-        
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
-            _streamConfigs.get(i),
-            topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000).stream().map(
-            metadata -> new PartitionGroupMetadata(
-                IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
-                    metadata.getPartitionGroupId(), index),
-                metadata.getStartOffset())).collect(Collectors.toList())
+          StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
+              StreamConsumerFactory.getUniqueClientId(clientId))) {
+        _newPartitionGroupMetadataList.addAll(
+            
streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId),
+                _streamConfigs.get(i),
+                topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000).stream().map(
+                metadata -> new PartitionGroupMetadata(
+                    
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
+                        metadata.getPartitionGroupId(), index),
+                    metadata.getStartOffset())).collect(Collectors.toList())
         );
         if (_exception != null) {
           // We had at least one failure, but succeeded now. Log an info
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index a8c4d22cc3..242cc9491b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -19,12 +19,15 @@
 package org.apache.pinot.spi.stream;
 
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
  * Factory for a stream which provides a consumer and a metadata provider for 
the stream
  */
 public abstract class StreamConsumerFactory {
+  private static final AtomicInteger CLIENT_ID_SEQ = new AtomicInteger(0);
+
   protected StreamConfig _streamConfig;
 
   /**
@@ -72,4 +75,11 @@ public abstract class StreamConsumerFactory {
       String groupId) {
     throw new UnsupportedOperationException();
   }
+
+  public static String getUniqueClientId(String prefix) {
+    if (prefix == null) {
+      return String.valueOf(CLIENT_ID_SEQ.getAndIncrement());
+    }
+    return prefix + "-" + CLIENT_ID_SEQ.getAndIncrement();
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 052993a6d0..64770d3f83 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -89,7 +89,7 @@ public interface StreamMetadataProvider extends Closeable {
     StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     for (int i = partitionGroupConsumptionStatuses.size(); i < partitionCount; 
i++) {
       try (StreamMetadataProvider partitionMetadataProvider = 
streamConsumerFactory.createPartitionMetadataProvider(
-          clientId, i)) {
+          StreamConsumerFactory.getUniqueClientId(clientId), i)) {
         StreamPartitionMsgOffset streamPartitionMsgOffset =
             
partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(),
 timeoutMillis);
         newPartitionGroupMetadataList.add(new PartitionGroupMetadata(i, 
streamPartitionMsgOffset));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to