This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8885da0d8b Always use segment partition id as stream partition id for 
single stream (#15957)
8885da0d8b is described below

commit 8885da0d8b2ff8b3785b0ccfc0ba54cda31027c0
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Jun 6 15:04:57 2025 -0600

    Always use segment partition id as stream partition id for single stream 
(#15957)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 86 ++++++++++++++++-----
 .../PinotLLCRealtimeSegmentManagerTest.java        |  2 +-
 .../realtime/RealtimeSegmentDataManager.java       | 31 +++++---
 .../stream/PartitionGroupConsumptionStatus.java    | 28 +++----
 .../spi/stream/PartitionGroupMetadataFetcher.java  | 89 +++++++++++++++-------
 .../pinot/spi/utils/IngestionConfigUtils.java      | 25 +++---
 6 files changed, 178 insertions(+), 83 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d2445f3234..a7b04e2766 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -276,20 +276,49 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     // Create a {@link PartitionGroupConsumptionStatus} for each latest segment
-    StreamPartitionMsgOffsetFactory offsetFactory =
-        
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
-    for (Map.Entry<Integer, LLCSegmentName> entry : 
partitionGroupIdToLatestSegment.entrySet()) {
-      int partitionGroupId = entry.getKey();
-      LLCSegmentName llcSegmentName = entry.getValue();
-      SegmentZKMetadata segmentZKMetadata =
-          getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(), 
llcSegmentName.getSegmentName());
-      PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
-          new PartitionGroupConsumptionStatus(partitionGroupId, 
llcSegmentName.getSequenceNumber(),
-              offsetFactory.create(segmentZKMetadata.getStartOffset()),
-              segmentZKMetadata.getEndOffset() == null ? null : 
offsetFactory.create(segmentZKMetadata.getEndOffset()),
-              segmentZKMetadata.getStatus().toString());
-      partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus);
+    String tableNameWithType = streamConfigs.get(0).getTableNameWithType();
+    int numStreams = streamConfigs.size();
+    if (numStreams == 1) {
+      // Single stream
+      // NOTE: We skip partition id translation logic to handle cases where 
custom stream might return partition id
+      // larger than 10000.
+      StreamConfig streamConfig = streamConfigs.get(0);
+      StreamPartitionMsgOffsetFactory offsetFactory =
+          
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+      for (Map.Entry<Integer, LLCSegmentName> entry : 
partitionGroupIdToLatestSegment.entrySet()) {
+        int partitionGroupId = entry.getKey();
+        LLCSegmentName llcSegmentName = entry.getValue();
+        SegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, llcSegmentName.getSegmentName());
+        PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
+            new PartitionGroupConsumptionStatus(partitionGroupId, 
llcSegmentName.getSequenceNumber(),
+                offsetFactory.create(segmentZKMetadata.getStartOffset()),
+                segmentZKMetadata.getEndOffset() != null ? 
offsetFactory.create(segmentZKMetadata.getEndOffset())
+                    : null, segmentZKMetadata.getStatus().toString());
+        
partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus);
+      }
+    } else {
+      // Multiple streams
+      StreamPartitionMsgOffsetFactory[] offsetFactories = new 
StreamPartitionMsgOffsetFactory[numStreams];
+      for (Map.Entry<Integer, LLCSegmentName> entry : 
partitionGroupIdToLatestSegment.entrySet()) {
+        int partitionGroupId = entry.getKey();
+        int index = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId);
+        int streamPartitionId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId);
+        LLCSegmentName llcSegmentName = entry.getValue();
+        SegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(tableNameWithType, llcSegmentName.getSegmentName());
+        StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index];
+        if (offsetFactory == null) {
+          offsetFactory = 
StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory();
+          offsetFactories[index] = offsetFactory;
+        }
+        PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
+            new PartitionGroupConsumptionStatus(partitionGroupId, 
streamPartitionId, llcSegmentName.getSequenceNumber(),
+                offsetFactory.create(segmentZKMetadata.getStartOffset()),
+                segmentZKMetadata.getEndOffset() != null ? 
offsetFactory.create(segmentZKMetadata.getEndOffset())
+                    : null, segmentZKMetadata.getStatus().toString());
+        
partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus);
+      }
     }
+
     return partitionGroupConsumptionStatusList;
   }
 
@@ -995,18 +1024,37 @@ public class PinotLLCRealtimeSegmentManager {
   Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState 
idealState) {
     Set<Integer> partitionIds = new HashSet<>();
     boolean allPartitionIdsFetched = true;
-    for (int i = 0; i < streamConfigs.size(); i++) {
-      final int index = i;
+    int numStreams = streamConfigs.size();
+    if (numStreams == 1) {
+      // Single stream
+      // NOTE: We skip partition id translation logic to handle cases where 
custom stream might return partition id
+      // larger than 10000.
+      StreamConfig streamConfig = streamConfigs.get(0);
       try {
-        partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
-            .map(partitionId -> 
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, 
index))
-            .collect(Collectors.toSet()));
+        partitionIds = getPartitionIds(streamConfig);
       } catch (UnsupportedOperationException ignored) {
         allPartitionIdsFetched = false;
         // Stream does not support fetching partition ids. There is a log in 
the fallback code which is sufficient
       } catch (Exception e) {
         allPartitionIdsFetched = false;
-        LOGGER.warn("Failed to fetch partition ids for stream: {}", 
streamConfigs.get(i).getTopicName(), e);
+        LOGGER.warn("Failed to fetch partition ids for stream: {}", 
streamConfig.getTopicName(), e);
+      }
+    } else {
+      // Multiple streams
+      for (int i = 0; i < numStreams; i++) {
+        StreamConfig streamConfig = streamConfigs.get(i);
+        int index = i;
+        try {
+          partitionIds.addAll(getPartitionIds(streamConfig).stream()
+              .map(partitionId -> 
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, 
index))
+              .collect(Collectors.toSet()));
+        } catch (UnsupportedOperationException ignored) {
+          allPartitionIdsFetched = false;
+          // Stream does not support fetching partition ids. There is a log in 
the fallback code which is sufficient
+        } catch (Exception e) {
+          allPartitionIdsFetched = false;
+          LOGGER.warn("Failed to fetch partition ids for stream: {}", 
streamConfig.getTopicName(), e);
+        }
       }
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 25c286d3a2..d15f87efbd 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1367,7 +1367,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
   public void testGetPartitionIds()
       throws Exception {
     List<StreamConfig> streamConfigs = 
List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs());
-    IdealState idealState = new IdealState("table");
+    IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
     FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
     segmentManager._numPartitions = 2;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index c34a6e373e..114f968a11 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -293,12 +293,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private Thread _consumerThread;
   // _partitionGroupId represents the Pinot's internal partition number which 
will eventually be used as part of
   // segment name.
-  // _streamPatitionGroupId represents the partition number in the stream 
topic, which could be derived from the
+  // _streamPartitionId represents the partition number in the stream topic, 
which could be derived from the
   // _partitionGroupId and identify which partition of the stream topic this 
consumer is consuming from.
   // Note that in traditional single topic ingestion mode, those two concepts 
were identical which got separated
   // in multi-topic ingestion mode.
   private final int _partitionGroupId;
-  private final int _streamPatitionGroupId;
+  private final int _streamPartitionId;
   private final PartitionGroupConsumptionStatus 
_partitionGroupConsumptionStatus;
   final String _clientId;
   private final TransformPipeline _transformPipeline;
@@ -1638,9 +1638,22 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
     _partitionGroupId = llcSegmentName.getPartitionGroupId();
-    _streamPatitionGroupId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId);
-    _streamConfig = new StreamConfig(_tableNameWithType, 
IngestionConfigUtils.getStreamConfigMaps(_tableConfig)
-        
.get(IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId)));
+    List<Map<String, String>> streamConfigMaps = 
IngestionConfigUtils.getStreamConfigMaps(_tableConfig);
+    int numStreams = streamConfigMaps.size();
+    if (numStreams == 1) {
+      // Single stream
+      // NOTE: We skip partition id translation logic to handle cases where 
custom stream might return partition id
+      // larger than 10000.
+      _streamPartitionId = _partitionGroupId;
+      _streamConfig = new StreamConfig(_tableNameWithType, 
streamConfigMaps.get(0));
+    } else {
+      // Multiple streams
+      _streamPartitionId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId);
+      int index = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId);
+      Preconditions.checkState(numStreams > index, "Cannot find stream config 
of index: %s for table: %s", index,
+          _tableNameWithType);
+      _streamConfig = new StreamConfig(_tableNameWithType, 
streamConfigMaps.get(index));
+    }
     _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
     _streamPartitionMsgOffsetFactory = 
_streamConsumerFactory.createStreamMsgOffsetFactory();
     String streamTopic = _streamConfig.getTopicName();
@@ -1655,9 +1668,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     String clientIdSuffix =
         instanceDataManagerConfig != null ? 
instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
     if (StringUtils.isNotBlank(clientIdSuffix)) {
-      _clientId = _tableNameWithType + "-" + streamTopic + "-" + 
_streamPatitionGroupId + "-" + clientIdSuffix;
+      _clientId = _tableNameWithType + "-" + streamTopic + "-" + 
_streamPartitionId + "-" + clientIdSuffix;
     } else {
-      _clientId = _tableNameWithType + "-" + streamTopic + "-" + 
_streamPatitionGroupId;
+      _clientId = _tableNameWithType + "-" + streamTopic + "-" + 
_streamPartitionId;
     }
     _segmentLogger = 
LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + 
_segmentNameStr);
     _tableStreamName = _tableNameWithType + "_" + streamTopic;
@@ -1977,8 +1990,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private void createPartitionMetadataProvider(String reason) {
     closePartitionMetadataProvider();
     _segmentLogger.info("Creating new partition metadata provider, reason: 
{}", reason);
-    _partitionMetadataProvider = 
_streamConsumerFactory.createPartitionMetadataProvider(
-        _clientId, _streamPatitionGroupId);
+    _partitionMetadataProvider =
+        _streamConsumerFactory.createPartitionMetadataProvider(_clientId, 
_streamPartitionId);
   }
 
   private void updateIngestionMetrics(RowMetadata metadata) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
index bc02df8462..d0405906cd 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
@@ -18,49 +18,51 @@
  */
 package org.apache.pinot.spi.stream;
 
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
-
-
 /**
  * A PartitionGroup is a group of partitions/shards that the same consumer 
should consume from.
  * This class contains all information which describes the latest state of a 
partition group.
  * It is constructed by looking at the segment zk metadata of the latest 
segment of each partition group.
  * It consists of:
  * 1. partitionGroupId - A unique ID for the partitionGroup
- * 2. sequenceNumber - The sequenceNumber this partitionGroup is currently at
- * 3. startOffset - The start offset that the latest segment started consuming 
from
- * 4. endOffset - The endOffset (if segment consuming from this partition 
group has finished consuming the segment
+ * 2. streamPartitionId - Partition ID of the stream that this partitionGroup 
belongs to.
+ * 3. sequenceNumber - The sequenceNumber this partitionGroup is currently at
+ * 4. startOffset - The start offset that the latest segment started consuming 
from
+ * 5. endOffset - The endOffset (if segment consuming from this partition 
group has finished consuming the segment
  * and recorded the end
  * offset)
- * 5. status - the consumption status IN_PROGRESS/DONE
+ * 6. status - the consumption status IN_PROGRESS/DONE
  *
  * This information is needed by the stream, when grouping the 
partitions/shards into new partition groups.
  */
 public class PartitionGroupConsumptionStatus {
-
   private final int _partitionGroupId;
-  private final int _streamPartitionGroupId;
+  private final int _streamPartitionId;
   private int _sequenceNumber;
   private StreamPartitionMsgOffset _startOffset;
   private StreamPartitionMsgOffset _endOffset;
   private String _status;
 
-  public PartitionGroupConsumptionStatus(int partitionGroupId, int 
sequenceNumber, StreamPartitionMsgOffset startOffset,
-      StreamPartitionMsgOffset endOffset, String status) {
+  public PartitionGroupConsumptionStatus(int partitionGroupId, int 
streamPartitionId, int sequenceNumber,
+      StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset 
endOffset, String status) {
     _partitionGroupId = partitionGroupId;
-    _streamPartitionGroupId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId);
+    _streamPartitionId = streamPartitionId;
     _sequenceNumber = sequenceNumber;
     _startOffset = startOffset;
     _endOffset = endOffset;
     _status = status;
   }
 
+  public PartitionGroupConsumptionStatus(int partitionGroupId, int 
sequenceNumber, StreamPartitionMsgOffset startOffset,
+      StreamPartitionMsgOffset endOffset, String status) {
+    this(partitionGroupId, partitionGroupId, sequenceNumber, startOffset, 
endOffset, status);
+  }
+
   public int getPartitionGroupId() {
     return _partitionGroupId;
   }
 
   public int getStreamPartitionGroupId() {
-    return _streamPartitionGroupId;
+    return _streamPartitionId;
   }
 
   public int getSequenceNumber() {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 53f0e33ed1..bf05ea0285 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -32,23 +32,19 @@ import org.slf4j.LoggerFactory;
  * using the {@link StreamMetadataProvider}
  */
 public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
 
-  private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
   private final List<StreamConfig> _streamConfigs;
   private final List<PartitionGroupConsumptionStatus> 
_partitionGroupConsumptionStatusList;
-  private Exception _exception;
-  private final List<String> _topicNames;
   private final boolean _forceGetOffsetFromStream;
+  private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList = 
new ArrayList<>();
+
+  private Exception _exception;
 
   public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
-      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList,
-      boolean forceGetOffsetFromStream) {
-    _topicNames = 
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList());
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) {
     _streamConfigs = streamConfigs;
     _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
-    _newPartitionGroupMetadataList = new ArrayList<>();
     _forceGetOffsetFromStream = forceGetOffsetFromStream;
   }
 
@@ -69,39 +65,74 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
   public Boolean call()
       throws Exception {
     _newPartitionGroupMetadataList.clear();
-    for (int i = 0; i < _streamConfigs.size(); i++) {
-      String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + 
"-"
-          + _streamConfigs.get(i).getTableNameWithType() + "-" + 
_topicNames.get(i);
-      StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfigs.get(i));
-      final int index = i;
+    return _streamConfigs.size() == 1 ? fetchSingleStream() : 
fetchMultipleStreams();
+  }
+
+  private Boolean fetchSingleStream()
+      throws Exception {
+    StreamConfig streamConfig = _streamConfigs.get(0);
+    String topicName = streamConfig.getTopicName();
+    String clientId =
+        PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + 
streamConfig.getTableNameWithType() + "-"
+            + topicName;
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
+        StreamConsumerFactory.getUniqueClientId(clientId))) {
+      
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
 streamConfig,
+          _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, 
_forceGetOffsetFromStream));
+      if (_exception != null) {
+        // We had at least one failure, but succeeded now. Log an info
+        LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic 
{}", topicName);
+      }
+    } catch (TransientConsumerException e) {
+      LOGGER.warn("Transient Exception: Could not get partition count for 
topic {}", topicName, e);
+      _exception = e;
+      return Boolean.FALSE;
+    } catch (Exception e) {
+      LOGGER.warn("Could not get partition count for topic {}", topicName, e);
+      _exception = e;
+      throw e;
+    }
+    return Boolean.TRUE;
+  }
+
+  private Boolean fetchMultipleStreams()
+      throws Exception {
+    int numStreams = _streamConfigs.size();
+    for (int i = 0; i < numStreams; i++) {
+      StreamConfig streamConfig = _streamConfigs.get(i);
+      String topicName = streamConfig.getTopicName();
+      String clientId =
+          PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + 
streamConfig.getTableNameWithType() + "-"
+              + topicName;
+      StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+      int index = i;
       List<PartitionGroupConsumptionStatus> 
topicPartitionGroupConsumptionStatusList =
           _partitionGroupConsumptionStatusList.stream()
-              .filter(partitionGroupConsumptionStatus ->
-                  
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
-                      partitionGroupConsumptionStatus.getPartitionGroupId()) 
== index)
+              .filter(partitionGroupConsumptionStatus -> 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+                  partitionGroupConsumptionStatus.getPartitionGroupId()) == 
index)
               .collect(Collectors.toList());
-      try (
-          StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
-              StreamConsumerFactory.getUniqueClientId(clientId))) {
+      try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
+          StreamConsumerFactory.getUniqueClientId(clientId))) {
         _newPartitionGroupMetadataList.addAll(
-            
streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId),
-                _streamConfigs.get(i),
-                topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000, _forceGetOffsetFromStream).stream()
+            streamMetadataProvider.computePartitionGroupMetadata(clientId,
+                    streamConfig, topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000,
+                    _forceGetOffsetFromStream)
+                .stream()
                 .map(metadata -> new PartitionGroupMetadata(
-                    
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
-                        metadata.getPartitionGroupId(), index),
-                    metadata.getStartOffset())).collect(Collectors.toList())
-        );
+                    
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
+                        index), metadata.getStartOffset()))
+                .collect(Collectors.toList()));
         if (_exception != null) {
           // We had at least one failure, but succeeded now. Log an info
-          LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic 
{}", _topicNames.get(i));
+          LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic 
{}", topicName);
         }
       } catch (TransientConsumerException e) {
-        LOGGER.warn("Transient Exception: Could not get partition count for 
topic {}", _topicNames.get(i), e);
+        LOGGER.warn("Transient Exception: Could not get partition count for 
topic {}", topicName, e);
         _exception = e;
         return Boolean.FALSE;
       } catch (Exception e) {
-        LOGGER.warn("Could not get partition count for topic {}", 
_topicNames.get(i), e);
+        LOGGER.warn("Could not get partition count for topic {}", topicName, 
e);
         _exception = e;
         throw e;
       }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index 99766db9e0..6e51b4e7be 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -128,19 +128,20 @@ public final class IngestionConfigUtils {
    * Fetches the streamConfig from the list of streamConfigs according to the 
partition id.
    */
   public static Map<String, String> getStreamConfigMap(TableConfig 
tableConfig, int partitionId) {
-    if (partitionId < PARTITION_PADDING_OFFSET) {
-      return getFirstStreamConfigMap(tableConfig);
+    List<Map<String, String>> streamConfigMaps = 
getStreamConfigMaps(tableConfig);
+    int numStreams = streamConfigMaps.size();
+    if (numStreams == 1) {
+      // Single stream
+      // NOTE: We skip partition id translation logic to handle cases where 
custom stream might return partition id
+      // larger than 10000.
+      return streamConfigMaps.get(0);
+    } else {
+      // Multiple streams
+      int index = getStreamConfigIndexFromPinotPartitionId(partitionId);
+      Preconditions.checkState(numStreams > index, "Cannot find stream config 
of index: %s for table: %s", index,
+          tableConfig.getTableName());
+      return streamConfigMaps.get(index);
     }
-
-    String tableNameWithType = tableConfig.getTableName();
-    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
-        "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
-    int index = getStreamConfigIndexFromPinotPartitionId(partitionId);
-    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
-    Preconditions.checkState(ingestionConfig != null && 
ingestionConfig.getStreamIngestionConfig() != null
-            && 
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().size() > index,
-        "Cannot find stream config of index: %s for table: %s", index, 
tableNameWithType);
-    return 
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().get(index);
   }
 
   public static List<AggregationConfig> getAggregationConfigs(TableConfig 
tableConfig) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to