This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8885da0d8b Always use segment partition id as stream partition id for single stream (#15957) 8885da0d8b is described below commit 8885da0d8b2ff8b3785b0ccfc0ba54cda31027c0 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Jun 6 15:04:57 2025 -0600 Always use segment partition id as stream partition id for single stream (#15957) --- .../realtime/PinotLLCRealtimeSegmentManager.java | 86 ++++++++++++++++----- .../PinotLLCRealtimeSegmentManagerTest.java | 2 +- .../realtime/RealtimeSegmentDataManager.java | 31 +++++--- .../stream/PartitionGroupConsumptionStatus.java | 28 +++---- .../spi/stream/PartitionGroupMetadataFetcher.java | 89 +++++++++++++++------- .../pinot/spi/utils/IngestionConfigUtils.java | 25 +++--- 6 files changed, 178 insertions(+), 83 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index d2445f3234..a7b04e2766 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -276,20 +276,49 @@ public class PinotLLCRealtimeSegmentManager { } // Create a {@link PartitionGroupConsumptionStatus} for each latest segment - StreamPartitionMsgOffsetFactory offsetFactory = - StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); - for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) { - int partitionGroupId = entry.getKey(); - LLCSegmentName llcSegmentName = entry.getValue(); - SegmentZKMetadata segmentZKMetadata = - getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(), llcSegmentName.getSegmentName()); - PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = - new PartitionGroupConsumptionStatus(partitionGroupId, llcSegmentName.getSequenceNumber(), - offsetFactory.create(segmentZKMetadata.getStartOffset()), - segmentZKMetadata.getEndOffset() == null ? null : offsetFactory.create(segmentZKMetadata.getEndOffset()), - segmentZKMetadata.getStatus().toString()); - partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus); + String tableNameWithType = streamConfigs.get(0).getTableNameWithType(); + int numStreams = streamConfigs.size(); + if (numStreams == 1) { + // Single stream + // NOTE: We skip partition id translation logic to handle cases where custom stream might return partition id + // larger than 10000. + StreamConfig streamConfig = streamConfigs.get(0); + StreamPartitionMsgOffsetFactory offsetFactory = + StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) { + int partitionGroupId = entry.getKey(); + LLCSegmentName llcSegmentName = entry.getValue(); + SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, llcSegmentName.getSegmentName()); + PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = + new PartitionGroupConsumptionStatus(partitionGroupId, llcSegmentName.getSequenceNumber(), + offsetFactory.create(segmentZKMetadata.getStartOffset()), + segmentZKMetadata.getEndOffset() != null ? offsetFactory.create(segmentZKMetadata.getEndOffset()) + : null, segmentZKMetadata.getStatus().toString()); + partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus); + } + } else { + // Multiple streams + StreamPartitionMsgOffsetFactory[] offsetFactories = new StreamPartitionMsgOffsetFactory[numStreams]; + for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) { + int partitionGroupId = entry.getKey(); + int index = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId); + int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); + LLCSegmentName llcSegmentName = entry.getValue(); + SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, llcSegmentName.getSegmentName()); + StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index]; + if (offsetFactory == null) { + offsetFactory = StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory(); + offsetFactories[index] = offsetFactory; + } + PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = + new PartitionGroupConsumptionStatus(partitionGroupId, streamPartitionId, llcSegmentName.getSequenceNumber(), + offsetFactory.create(segmentZKMetadata.getStartOffset()), + segmentZKMetadata.getEndOffset() != null ? offsetFactory.create(segmentZKMetadata.getEndOffset()) + : null, segmentZKMetadata.getStatus().toString()); + partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus); + } } + return partitionGroupConsumptionStatusList; } @@ -995,18 +1024,37 @@ public class PinotLLCRealtimeSegmentManager { Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState idealState) { Set<Integer> partitionIds = new HashSet<>(); boolean allPartitionIdsFetched = true; - for (int i = 0; i < streamConfigs.size(); i++) { - final int index = i; + int numStreams = streamConfigs.size(); + if (numStreams == 1) { + // Single stream + // NOTE: We skip partition id translation logic to handle cases where custom stream might return partition id + // larger than 10000. + StreamConfig streamConfig = streamConfigs.get(0); try { - partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream() - .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) - .collect(Collectors.toSet())); + partitionIds = getPartitionIds(streamConfig); } catch (UnsupportedOperationException ignored) { allPartitionIdsFetched = false; // Stream does not support fetching partition ids. There is a log in the fallback code which is sufficient } catch (Exception e) { allPartitionIdsFetched = false; - LOGGER.warn("Failed to fetch partition ids for stream: {}", streamConfigs.get(i).getTopicName(), e); + LOGGER.warn("Failed to fetch partition ids for stream: {}", streamConfig.getTopicName(), e); + } + } else { + // Multiple streams + for (int i = 0; i < numStreams; i++) { + StreamConfig streamConfig = streamConfigs.get(i); + int index = i; + try { + partitionIds.addAll(getPartitionIds(streamConfig).stream() + .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) + .collect(Collectors.toSet())); + } catch (UnsupportedOperationException ignored) { + allPartitionIdsFetched = false; + // Stream does not support fetching partition ids. There is a log in the fallback code which is sufficient + } catch (Exception e) { + allPartitionIdsFetched = false; + LOGGER.warn("Failed to fetch partition ids for stream: {}", streamConfig.getTopicName(), e); + } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 25c286d3a2..d15f87efbd 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1367,7 +1367,7 @@ public class PinotLLCRealtimeSegmentManagerTest { public void testGetPartitionIds() throws Exception { List<StreamConfig> streamConfigs = List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs()); - IdealState idealState = new IdealState("table"); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(); segmentManager._numPartitions = 2; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index c34a6e373e..114f968a11 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -293,12 +293,12 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private Thread _consumerThread; // _partitionGroupId represents the Pinot's internal partition number which will eventually be used as part of // segment name. - // _streamPatitionGroupId represents the partition number in the stream topic, which could be derived from the + // _streamPartitionId represents the partition number in the stream topic, which could be derived from the // _partitionGroupId and identify which partition of the stream topic this consumer is consuming from. // Note that in traditional single topic ingestion mode, those two concepts were identical which got separated // in multi-topic ingestion mode. private final int _partitionGroupId; - private final int _streamPatitionGroupId; + private final int _streamPartitionId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; final String _clientId; private final TransformPipeline _transformPipeline; @@ -1638,9 +1638,22 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // TODO Validate configs IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); _partitionGroupId = llcSegmentName.getPartitionGroupId(); - _streamPatitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId); - _streamConfig = new StreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMaps(_tableConfig) - .get(IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId))); + List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(_tableConfig); + int numStreams = streamConfigMaps.size(); + if (numStreams == 1) { + // Single stream + // NOTE: We skip partition id translation logic to handle cases where custom stream might return partition id + // larger than 10000. + _streamPartitionId = _partitionGroupId; + _streamConfig = new StreamConfig(_tableNameWithType, streamConfigMaps.get(0)); + } else { + // Multiple streams + _streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId); + int index = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId); + Preconditions.checkState(numStreams > index, "Cannot find stream config of index: %s for table: %s", index, + _tableNameWithType); + _streamConfig = new StreamConfig(_tableNameWithType, streamConfigMaps.get(index)); + } _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); _streamPartitionMsgOffsetFactory = _streamConsumerFactory.createStreamMsgOffsetFactory(); String streamTopic = _streamConfig.getTopicName(); @@ -1655,9 +1668,9 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { String clientIdSuffix = instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null; if (StringUtils.isNotBlank(clientIdSuffix)) { - _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId + "-" + clientIdSuffix; + _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPartitionId + "-" + clientIdSuffix; } else { - _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId; + _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPartitionId; } _segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); _tableStreamName = _tableNameWithType + "_" + streamTopic; @@ -1977,8 +1990,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private void createPartitionMetadataProvider(String reason) { closePartitionMetadataProvider(); _segmentLogger.info("Creating new partition metadata provider, reason: {}", reason); - _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider( - _clientId, _streamPatitionGroupId); + _partitionMetadataProvider = + _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _streamPartitionId); } private void updateIngestionMetrics(RowMetadata metadata) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java index bc02df8462..d0405906cd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java @@ -18,49 +18,51 @@ */ package org.apache.pinot.spi.stream; -import org.apache.pinot.spi.utils.IngestionConfigUtils; - - /** * A PartitionGroup is a group of partitions/shards that the same consumer should consume from. * This class contains all information which describes the latest state of a partition group. * It is constructed by looking at the segment zk metadata of the latest segment of each partition group. * It consists of: * 1. partitionGroupId - A unique ID for the partitionGroup - * 2. sequenceNumber - The sequenceNumber this partitionGroup is currently at - * 3. startOffset - The start offset that the latest segment started consuming from - * 4. endOffset - The endOffset (if segment consuming from this partition group has finished consuming the segment + * 2. streamPartitionId - Partition ID of the stream that this partitionGroup belongs to. + * 3. sequenceNumber - The sequenceNumber this partitionGroup is currently at + * 4. startOffset - The start offset that the latest segment started consuming from + * 5. endOffset - The endOffset (if segment consuming from this partition group has finished consuming the segment * and recorded the end * offset) - * 5. status - the consumption status IN_PROGRESS/DONE + * 6. status - the consumption status IN_PROGRESS/DONE * * This information is needed by the stream, when grouping the partitions/shards into new partition groups. */ public class PartitionGroupConsumptionStatus { - private final int _partitionGroupId; - private final int _streamPartitionGroupId; + private final int _streamPartitionId; private int _sequenceNumber; private StreamPartitionMsgOffset _startOffset; private StreamPartitionMsgOffset _endOffset; private String _status; - public PartitionGroupConsumptionStatus(int partitionGroupId, int sequenceNumber, StreamPartitionMsgOffset startOffset, - StreamPartitionMsgOffset endOffset, String status) { + public PartitionGroupConsumptionStatus(int partitionGroupId, int streamPartitionId, int sequenceNumber, + StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, String status) { _partitionGroupId = partitionGroupId; - _streamPartitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); + _streamPartitionId = streamPartitionId; _sequenceNumber = sequenceNumber; _startOffset = startOffset; _endOffset = endOffset; _status = status; } + public PartitionGroupConsumptionStatus(int partitionGroupId, int sequenceNumber, StreamPartitionMsgOffset startOffset, + StreamPartitionMsgOffset endOffset, String status) { + this(partitionGroupId, partitionGroupId, sequenceNumber, startOffset, endOffset, status); + } + public int getPartitionGroupId() { return _partitionGroupId; } public int getStreamPartitionGroupId() { - return _streamPartitionGroupId; + return _streamPartitionId; } public int getSequenceNumber() { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 53f0e33ed1..bf05ea0285 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -32,23 +32,19 @@ import org.slf4j.LoggerFactory; * using the {@link StreamMetadataProvider} */ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { - private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); - private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList; private final List<StreamConfig> _streamConfigs; private final List<PartitionGroupConsumptionStatus> _partitionGroupConsumptionStatusList; - private Exception _exception; - private final List<String> _topicNames; private final boolean _forceGetOffsetFromStream; + private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList = new ArrayList<>(); + + private Exception _exception; public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs, - List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList, - boolean forceGetOffsetFromStream) { - _topicNames = streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList()); + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) { _streamConfigs = streamConfigs; _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; - _newPartitionGroupMetadataList = new ArrayList<>(); _forceGetOffsetFromStream = forceGetOffsetFromStream; } @@ -69,39 +65,74 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { public Boolean call() throws Exception { _newPartitionGroupMetadataList.clear(); - for (int i = 0; i < _streamConfigs.size(); i++) { - String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" - + _streamConfigs.get(i).getTableNameWithType() + "-" + _topicNames.get(i); - StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfigs.get(i)); - final int index = i; + return _streamConfigs.size() == 1 ? fetchSingleStream() : fetchMultipleStreams(); + } + + private Boolean fetchSingleStream() + throws Exception { + StreamConfig streamConfig = _streamConfigs.get(0); + String topicName = streamConfig.getTopicName(); + String clientId = + PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" + + topicName; + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( + StreamConsumerFactory.getUniqueClientId(clientId))) { + _newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId, streamConfig, + _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, _forceGetOffsetFromStream)); + if (_exception != null) { + // We had at least one failure, but succeeded now. Log an info + LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic {}", topicName); + } + } catch (TransientConsumerException e) { + LOGGER.warn("Transient Exception: Could not get partition count for topic {}", topicName, e); + _exception = e; + return Boolean.FALSE; + } catch (Exception e) { + LOGGER.warn("Could not get partition count for topic {}", topicName, e); + _exception = e; + throw e; + } + return Boolean.TRUE; + } + + private Boolean fetchMultipleStreams() + throws Exception { + int numStreams = _streamConfigs.size(); + for (int i = 0; i < numStreams; i++) { + StreamConfig streamConfig = _streamConfigs.get(i); + String topicName = streamConfig.getTopicName(); + String clientId = + PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" + + topicName; + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + int index = i; List<PartitionGroupConsumptionStatus> topicPartitionGroupConsumptionStatusList = _partitionGroupConsumptionStatusList.stream() - .filter(partitionGroupConsumptionStatus -> - IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId( - partitionGroupConsumptionStatus.getPartitionGroupId()) == index) + .filter(partitionGroupConsumptionStatus -> IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId( + partitionGroupConsumptionStatus.getPartitionGroupId()) == index) .collect(Collectors.toList()); - try ( - StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( - StreamConsumerFactory.getUniqueClientId(clientId))) { + try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( + StreamConsumerFactory.getUniqueClientId(clientId))) { _newPartitionGroupMetadataList.addAll( - streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId), - _streamConfigs.get(i), - topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, _forceGetOffsetFromStream).stream() + streamMetadataProvider.computePartitionGroupMetadata(clientId, + streamConfig, topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, + _forceGetOffsetFromStream) + .stream() .map(metadata -> new PartitionGroupMetadata( - IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( - metadata.getPartitionGroupId(), index), - metadata.getStartOffset())).collect(Collectors.toList()) - ); + IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(), + index), metadata.getStartOffset())) + .collect(Collectors.toList())); if (_exception != null) { // We had at least one failure, but succeeded now. Log an info - LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic {}", _topicNames.get(i)); + LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic {}", topicName); } } catch (TransientConsumerException e) { - LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicNames.get(i), e); + LOGGER.warn("Transient Exception: Could not get partition count for topic {}", topicName, e); _exception = e; return Boolean.FALSE; } catch (Exception e) { - LOGGER.warn("Could not get partition count for topic {}", _topicNames.get(i), e); + LOGGER.warn("Could not get partition count for topic {}", topicName, e); _exception = e; throw e; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 99766db9e0..6e51b4e7be 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -128,19 +128,20 @@ public final class IngestionConfigUtils { * Fetches the streamConfig from the list of streamConfigs according to the partition id. */ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig, int partitionId) { - if (partitionId < PARTITION_PADDING_OFFSET) { - return getFirstStreamConfigMap(tableConfig); + List<Map<String, String>> streamConfigMaps = getStreamConfigMaps(tableConfig); + int numStreams = streamConfigMaps.size(); + if (numStreams == 1) { + // Single stream + // NOTE: We skip partition id translation logic to handle cases where custom stream might return partition id + // larger than 10000. + return streamConfigMaps.get(0); + } else { + // Multiple streams + int index = getStreamConfigIndexFromPinotPartitionId(partitionId); + Preconditions.checkState(numStreams > index, "Cannot find stream config of index: %s for table: %s", index, + tableConfig.getTableName()); + return streamConfigMaps.get(index); } - - String tableNameWithType = tableConfig.getTableName(); - Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, - "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); - int index = getStreamConfigIndexFromPinotPartitionId(partitionId); - IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); - Preconditions.checkState(ingestionConfig != null && ingestionConfig.getStreamIngestionConfig() != null - && ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().size() > index, - "Cannot find stream config of index: %s for table: %s", index, tableNameWithType); - return ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().get(index); } public static List<AggregationConfig> getAggregationConfigs(TableConfig tableConfig) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org