This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fe72e939e2 Make clientId to be unique for PartitionGroupMetadataFetcher (#15393) fe72e939e2 is described below commit fe72e939e2230049a2b11fb199f69a313a5be7a6 Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Mon Mar 31 23:40:13 2025 -0700 Make clientId to be unique for PartitionGroupMetadataFetcher (#15393) --- .../core/realtime/PinotLLCRealtimeSegmentManager.java | 4 ++-- .../realtime/RealtimeConsumptionRateManager.java | 3 ++- .../spi/stream/PartitionGroupMetadataFetcher.java | 19 ++++++++++--------- .../pinot/spi/stream/StreamConsumerFactory.java | 10 ++++++++++ .../pinot/spi/stream/StreamMetadataProvider.java | 2 +- 5 files changed, 25 insertions(+), 13 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 9e8bff5f5d..7d7393a06c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -998,9 +998,9 @@ public class PinotLLCRealtimeSegmentManager { @VisibleForTesting Set<Integer> getPartitionIds(StreamConfig streamConfig) throws Exception { - String clientId = + String clientId = StreamConsumerFactory.getUniqueClientId( PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" - + streamConfig.getTopicName(); + + streamConfig.getTopicName()); StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig); try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) { return metadataProvider.fetchPartitionIds(5000L); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java index 0d59899c9e..153c22fd85 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java @@ -201,7 +201,8 @@ public class RealtimeConsumptionRateManager { @VisibleForTesting static final PartitionCountFetcher DEFAULT_PARTITION_COUNT_FETCHER = streamConfig -> { - String clientId = streamConfig.getTopicName() + "-consumption.rate.manager"; + String clientId = + StreamConsumerFactory.getUniqueClientId(streamConfig.getTopicName() + "-consumption.rate.manager"); StreamConsumerFactory factory = StreamConsumerFactoryProvider.create(streamConfig); try (StreamMetadataProvider streamMetadataProvider = factory.createStreamMetadataProvider(clientId)) { return streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs*/10_000); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 158e28ce72..30cbe8bd63 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -78,15 +78,16 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { partitionGroupConsumptionStatus.getPartitionGroupId()) == index) .collect(Collectors.toList()); try ( - StreamMetadataProvider streamMetadataProvider = - streamConsumerFactory.createStreamMetadataProvider(clientId)) { - _newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId, - _streamConfigs.get(i), - topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( - metadata -> new PartitionGroupMetadata( - IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( - metadata.getPartitionGroupId(), index), - metadata.getStartOffset())).collect(Collectors.toList()) + StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( + StreamConsumerFactory.getUniqueClientId(clientId))) { + _newPartitionGroupMetadataList.addAll( + streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId), + _streamConfigs.get(i), + topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( + metadata -> new PartitionGroupMetadata( + IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( + metadata.getPartitionGroupId(), index), + metadata.getStartOffset())).collect(Collectors.toList()) ); if (_exception != null) { // We had at least one failure, but succeeded now. Log an info diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index a8c4d22cc3..242cc9491b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -19,12 +19,15 @@ package org.apache.pinot.spi.stream; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; /** * Factory for a stream which provides a consumer and a metadata provider for the stream */ public abstract class StreamConsumerFactory { + private static final AtomicInteger CLIENT_ID_SEQ = new AtomicInteger(0); + protected StreamConfig _streamConfig; /** @@ -72,4 +75,11 @@ public abstract class StreamConsumerFactory { String groupId) { throw new UnsupportedOperationException(); } + + public static String getUniqueClientId(String prefix) { + if (prefix == null) { + return String.valueOf(CLIENT_ID_SEQ.getAndIncrement()); + } + return prefix + "-" + CLIENT_ID_SEQ.getAndIncrement(); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 052993a6d0..64770d3f83 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -89,7 +89,7 @@ public interface StreamMetadataProvider extends Closeable { StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); for (int i = partitionGroupConsumptionStatuses.size(); i < partitionCount; i++) { try (StreamMetadataProvider partitionMetadataProvider = streamConsumerFactory.createPartitionMetadataProvider( - clientId, i)) { + StreamConsumerFactory.getUniqueClientId(clientId), i)) { StreamPartitionMsgOffset streamPartitionMsgOffset = partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(), timeoutMillis); newPartitionGroupMetadataList.add(new PartitionGroupMetadata(i, streamPartitionMsgOffset)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org