This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 73abb21c23 Add multi stream ingestion support (#13790)
73abb21c23 is described below

commit 73abb21c23d01d62f26e8f2a1eb8fddcd0f75aa2
Author: lnbest0707 <106711887+lnbest0707-u...@users.noreply.github.com>
AuthorDate: Thu Dec 19 10:39:38 2024 -0800

    Add multi stream ingestion support (#13790)
    
    * Add multi stream ingestion support
    
    * Fix UT
    
    * Fix issues, rebase and resolve comments
    
    * Resolve comments
    
    * Fix style
    
    * Ensure transient exceptions do not prevent creating new consuming segments
    
    Summary:
    Ensure transient exceptions do not prevent creating new consuming segments. 
If some exception is hit, attempt to reconcile any successful fetches with 
partition group metadata.
    
    This ensures consuming partitions are not dropped, and attempts to add and 
new partitions discovered successfully.
    
    Test Plan:
    After deployment, despite still some `TransientConsumerException`, no new 
missing consuming segments appear
    {F1002071843}
    
    {F1002071523}
    
    Reviewers: gaoxin, tingchen
    
    Reviewed By: gaoxin
    
    JIRA Issues: EVA-8951
    
    Differential Revision: https://code.uberinternal.com/D15748639
    
    * Resolve comments for optimizing java doc
    
    * Edit doc/comment
    
    * Remove unrelated files
    
    * Rebase and resolve conflicts
    
    * Take the metadata fetch time change from the HEAD
    
    * Resolve conflicts
    
    ---------
    
    Co-authored-by: Christopher Peck <p...@uber.com>
---
 .../pinot/controller/BaseControllerStarter.java    |  12 +-
 .../controller/helix/SegmentStatusChecker.java     |   8 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |  12 +-
 .../realtime/MissingConsumingSegmentFinder.java    |  18 ++-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 132 +++++++++++++--------
 .../core/realtime/SegmentCompletionManager.java    |   4 +-
 .../RealtimeSegmentValidationManager.java          |  12 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |  52 ++++++--
 .../provider/DefaultTableDataManagerProvider.java  |   2 +-
 .../realtime/RealtimeSegmentDataManager.java       |  22 +++-
 .../manager/realtime/SegmentCommitterFactory.java  |   2 +-
 .../segment/local/utils/TableConfigUtils.java      |  24 ++--
 .../segment/local/utils/TableConfigUtilsTest.java  |   5 +-
 .../stream/PartitionGroupConsumptionStatus.java    |   9 ++
 .../spi/stream/PartitionGroupMetadataFetcher.java  |  74 +++++++-----
 .../org/apache/pinot/spi/stream/StreamConfig.java  |  10 +-
 .../pinot/spi/stream/StreamConsumerFactory.java    |   2 +-
 .../pinot/spi/stream/StreamMetadataProvider.java   |   2 +-
 .../pinot/spi/utils/IngestionConfigUtils.java      | 100 +++++++++++++++-
 .../pinot/spi/utils/IngestionConfigUtilsTest.java  |  19 +--
 20 files changed, 367 insertions(+), 154 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 342413d355..0326f97a7b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -257,7 +257,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       // This executor service is used to do async tasks from multiget util or 
table rebalancing.
       _executorService = 
createExecutorService(_config.getControllerExecutorNumThreads(), 
"async-task-thread-%d");
       _tenantRebalanceExecutorService = 
createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
-              "tenant-rebalance-thread-%d");
+          "tenant-rebalance-thread-%d");
       _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, 
_tenantRebalanceExecutorService);
     }
 
@@ -272,7 +272,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   private ExecutorService createExecutorService(int numThreadPool, String 
threadNameFormat) {
     ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
     return (numThreadPool <= 0) ? Executors.newCachedThreadPool(threadFactory)
-            : Executors.newFixedThreadPool(numThreadPool, threadFactory);
+        : Executors.newFixedThreadPool(numThreadPool, threadFactory);
   }
 
   private void inferHostnameIfNeeded(ControllerConf config) {
@@ -577,10 +577,12 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
       TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
       if (tableConfig != null) {
-        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        List<Map<String, String>> streamConfigMaps = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig);
         try {
-          
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE,
 "kafka"),
-              streamConfigMap);
+          for (Map<String, String> streamConfigMap : streamConfigMaps) {
+            
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE,
 "kafka"),
+                streamConfigMap);
+          }
         } catch (Exception e) {
           existingHlcTables.add(rt);
         }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index c9a48022c0..1a5f542dd7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -419,10 +420,11 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
         numInvalidEndTime);
 
     if (tableType == TableType.REALTIME && tableConfig != null) {
-      StreamConfig streamConfig =
-          new StreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map(
+          streamConfig -> new StreamConfig(tableConfig.getTableName(), 
streamConfig)
+      ).collect(Collectors.toList());
       new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, 
_controllerMetrics,
-          streamConfig).findAndEmitMetrics(idealState);
+          streamConfigs).findAndEmitMetrics(idealState);
     }
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 23a115417f..8895d9df50 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -54,6 +54,7 @@ public class PinotTableIdealStateBuilder {
   /**
    * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
    * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
+   * In particular, this method can also be used to fetch from multiple stream 
topics.
    *
    * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
    *
@@ -79,23 +80,24 @@ public class PinotTableIdealStateBuilder {
    * the collection of shards in partition group 1, should remain unchanged in 
the response,
    * whereas shards 3,4 can be added to new partition groups if needed.
    *
-   * @param streamConfig the streamConfig from the tableConfig
+   * @param streamConfigs the List of streamConfig from the tableConfig
    * @param partitionGroupConsumptionStatusList List of {@link 
PartitionGroupConsumptionStatus} for the current
    *                                            partition groups.
    *                                          The size of this list is equal 
to the number of partition groups,
    *                                          and is created using the latest 
segment zk metadata.
    */
-  public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamConfig streamConfig,
+  public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
       List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
     PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
-        new PartitionGroupMetadataFetcher(streamConfig, 
partitionGroupConsumptionStatusList);
+        new PartitionGroupMetadataFetcher(streamConfigs, 
partitionGroupConsumptionStatusList);
     try {
       
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
       return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
     } catch (Exception e) {
       Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
-      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}", streamConfig.getTopicName(),
-          streamConfig.getTableNameWithType(), fetcherException);
+      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}",
+          streamConfigs.stream().map(streamConfig -> 
streamConfig.getTopicName()).reduce((a, b) -> a + "," + b),
+          streamConfigs.get(0).getTableNameWithType(), fetcherException);
       throw new RuntimeException(fetcherException);
     }
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index f4192a5a1a..5fe2ffe6d6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -24,7 +24,9 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.helix.AccessOption;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -65,25 +67,29 @@ public class MissingConsumingSegmentFinder {
   private ControllerMetrics _controllerMetrics;
 
   public MissingConsumingSegmentFinder(String realtimeTableName, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      ControllerMetrics controllerMetrics, StreamConfig streamConfig) {
+      ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) {
     _realtimeTableName = realtimeTableName;
     _controllerMetrics = controllerMetrics;
     _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, 
controllerMetrics);
     _streamPartitionMsgOffsetFactory =
-        
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+        
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
 
     // create partition group id to largest stream offset map
     _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
-    streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
+    streamConfigs.stream().map(streamConfig -> {
+      streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
+      return streamConfig;
+    });
     try {
-      PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, 
Collections.emptyList())
+      PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList())
           .forEach(metadata -> {
             
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
           });
     } catch (Exception e) {
-      LOGGER.warn("Problem encountered in fetching stream metadata for topic: 
{} of table: {}. "
+      LOGGER.warn("Problem encountered in fetching stream metadata for topics: 
{} of table: {}. "
               + "Continue finding missing consuming segment only with ideal 
state information.",
-          streamConfig.getTopicName(), streamConfig.getTableNameWithType());
+          streamConfigs.stream().map(streamConfig -> 
streamConfig.getTopicName()).collect(Collectors.toList()),
+          streamConfigs.get(0).getTableNameWithType());
     }
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 44ca01812a..4ba7cd2208 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -232,7 +232,7 @@ public class PinotLLCRealtimeSegmentManager {
    * for latest segment of each partition group.
    */
   public List<PartitionGroupConsumptionStatus> 
getPartitionGroupConsumptionStatusList(IdealState idealState,
-      StreamConfig streamConfig) {
+      List<StreamConfig> streamConfigs) {
     List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList 
= new ArrayList<>();
 
     // From all segment names in the ideal state, find unique partition group 
ids and their latest segment
@@ -257,12 +257,12 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Create a {@link PartitionGroupConsumptionStatus} for each latest segment
     StreamPartitionMsgOffsetFactory offsetFactory =
-        
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+        
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
     for (Map.Entry<Integer, LLCSegmentName> entry : 
partitionGroupIdToLatestSegment.entrySet()) {
       int partitionGroupId = entry.getKey();
       LLCSegmentName llcSegmentName = entry.getValue();
       SegmentZKMetadata segmentZKMetadata =
-          getSegmentZKMetadata(streamConfig.getTableNameWithType(), 
llcSegmentName.getSegmentName());
+          getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(), 
llcSegmentName.getSegmentName());
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
           new PartitionGroupConsumptionStatus(partitionGroupId, 
llcSegmentName.getSequenceNumber(),
               offsetFactory.create(segmentZKMetadata.getStartOffset()),
@@ -322,11 +322,12 @@ public class PinotLLCRealtimeSegmentManager {
 
     _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
 
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map(
+        streamConfig -> new StreamConfig(tableConfig.getTableName(), 
streamConfig)
+    ).collect(Collectors.toList());
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
     List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, 
Collections.emptyList());
+        getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList());
     int numPartitionGroups = newPartitionGroupMetadataList.size();
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
@@ -339,7 +340,8 @@ public class PinotLLCRealtimeSegmentManager {
     Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
     for (PartitionGroupMetadata partitionGroupMetadata : 
newPartitionGroupMetadataList) {
       String segmentName =
-          setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupMetadata, currentTimeMs, instancePartitions,
+          setupNewPartitionGroup(tableConfig, streamConfigs.get(0), 
partitionGroupMetadata, currentTimeMs,
+              instancePartitions,
               numPartitionGroups, numReplicas);
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -548,29 +550,16 @@ public class PinotLLCRealtimeSegmentManager {
     long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName = null;
     if (!isTablePaused(idealState)) {
-      StreamConfig streamConfig =
-          new StreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
-      Set<Integer> partitionIds;
-      try {
-        partitionIds = getPartitionIds(streamConfig);
-      } catch (Exception e) {
-        LOGGER.info("Failed to fetch partition ids from stream metadata 
provider for table: {}, exception: {}. "
-            + "Reading all partition group metadata to determine partition 
ids.", realtimeTableName, e.toString());
-        // TODO: Find a better way to determine partition count and if the 
committing partition group is fully consumed.
-        //       We don't need to read partition group metadata for other 
partition groups.
-        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
-            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-            getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
-        partitionIds = 
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
-            .collect(Collectors.toSet());
-      }
+      List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map(
+          streamConfig -> new StreamConfig(tableConfig.getTableName(), 
streamConfig)
+      ).collect(Collectors.toList());
+      Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
       if (partitionIds.contains(committingSegmentPartitionGroupId)) {
         String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
         long newSegmentCreationTimeMs = getCurrentTimeMs();
         LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
             committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
-        createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
+        createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), 
newLLCSegment, newSegmentCreationTimeMs,
             committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
             numReplicas);
         newConsumingSegmentName = newLLCSegment.getSegmentName();
@@ -764,7 +753,7 @@ public class PinotLLCRealtimeSegmentManager {
       return commitTimeoutMS;
     }
     TableConfig tableConfig = getTableConfig(realtimeTableName);
-    final Map<String, String> streamConfigs = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+    final Map<String, String> streamConfigs = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0);
     if 
(streamConfigs.containsKey(StreamConfigProperties.SEGMENT_COMMIT_TIMEOUT_SECONDS))
 {
       final String commitTimeoutSecondsStr = 
streamConfigs.get(StreamConfigProperties.SEGMENT_COMMIT_TIMEOUT_SECONDS);
       try {
@@ -793,15 +782,49 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  @VisibleForTesting
+  Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState 
idealState) {
+    Set<Integer> partitionIds = new HashSet<>();
+    boolean allPartitionIdsFetched = true;
+    for (int i = 0; i < streamConfigs.size(); i++) {
+      final int index = i;
+      try {
+        partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
+            .map(partitionId -> 
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, 
index))
+            .collect(Collectors.toSet()));
+      } catch (Exception e) {
+        allPartitionIdsFetched = false;
+        LOGGER.warn("Failed to fetch partition ids for stream: {}", 
streamConfigs.get(i).getTopicName(), e);
+      }
+    }
+
+    // If it is failing to fetch partition ids from stream (usually transient 
due to stream metadata service outage),
+    // we need to use the existing partition information from ideal state to 
keep same ingestion behavior.
+    if (!allPartitionIdsFetched) {
+      LOGGER.info(
+          "Fetch partition ids from Stream incomplete, merge fetched 
partitionIds with partition group metadata "
+              + "for: {}", idealState.getId());
+      // TODO: Find a better way to determine partition count and if the 
committing partition group is fully consumed.
+      //       We don't need to read partition group metadata for other 
partition groups.
+      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+          getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+      List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+          getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
+      
partitionIds.addAll(newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
+          .collect(Collectors.toSet()));
+    }
+    return partitionIds;
+  }
+
   /**
    * Fetches the latest state of the PartitionGroups for the stream
    * If any partition has reached end of life, and all messages of that 
partition have been consumed by the segment,
    * it will be skipped from the result
    */
   @VisibleForTesting
-  List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig 
streamConfig,
+  List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
-    return 
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig,
+    return 
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
         currentPartitionGroupConsumptionStatusList);
   }
 
@@ -918,7 +941,7 @@ public class PinotLLCRealtimeSegmentManager {
    * IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is 
ONLINE.
    * If so, it should create a new CONSUMING segment for the partition.
    */
-  public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
StreamConfig streamConfig,
+  public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
List<StreamConfig> streamConfigs,
       OffsetCriteria offsetCriteria) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
@@ -932,15 +955,16 @@ public class PinotLLCRealtimeSegmentManager {
         List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
             offsetsHaveToChange
                 ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
-                : getPartitionGroupConsumptionStatusList(idealState, 
streamConfig);
-        OffsetCriteria originalOffsetCriteria = 
streamConfig.getOffsetCriteria();
+                : getPartitionGroupConsumptionStatusList(idealState, 
streamConfigs);
+        // FIXME: Right now, we assume topics are sharing same offset criteria
+        OffsetCriteria originalOffsetCriteria = 
streamConfigs.get(0).getOffsetCriteria();
         // Read the smallest offset when a new partition is detected
-        streamConfig.setOffsetCriteria(
-            offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+        streamConfigs.stream().forEach(streamConfig -> 
streamConfig.setOffsetCriteria(offsetsHaveToChange
+            ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
         List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-            getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
-        streamConfig.setOffsetCriteria(originalOffsetCriteria);
-        return ensureAllPartitionsConsuming(tableConfig, streamConfig, 
idealState, newPartitionGroupMetadataList,
+            getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
+        streamConfigs.stream().forEach(streamConfig -> 
streamConfig.setOffsetCriteria(originalOffsetCriteria));
+        return ensureAllPartitionsConsuming(tableConfig, streamConfigs, 
idealState, newPartitionGroupMetadataList,
             offsetCriteria);
       } else {
         LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
@@ -1160,8 +1184,8 @@ public class PinotLLCRealtimeSegmentManager {
    * TODO: split this method into multiple smaller methods
    */
   @VisibleForTesting
-  IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, 
StreamConfig streamConfig, IdealState idealState,
-      List<PartitionGroupMetadata> partitionGroupMetadataList, OffsetCriteria 
offsetCriteria) {
+  IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, 
List<StreamConfig> streamConfigs,
+        IdealState idealState, List<PartitionGroupMetadata> 
partitionGroupMetadataList, OffsetCriteria offsetCriteria) {
     String realtimeTableName = tableConfig.getTableName();
 
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
@@ -1175,7 +1199,7 @@ public class PinotLLCRealtimeSegmentManager {
 
     Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
     StreamPartitionMsgOffsetFactory offsetFactory =
-        
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+        
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
 
     // Get the latest segment ZK metadata for each partition
     Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = 
getLatestSegmentZKMetadataMap(realtimeTableName);
@@ -1240,7 +1264,7 @@ public class PinotLLCRealtimeSegmentManager {
               CommittingSegmentDescriptor committingSegmentDescriptor =
                   new CommittingSegmentDescriptor(latestSegmentName,
                       
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
-              createNewSegmentZKMetadata(tableConfig, streamConfig, 
newLLCSegmentName, currentTimeMs,
+              createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), 
newLLCSegmentName, currentTimeMs,
                   committingSegmentDescriptor, latestSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas);
               updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
latestSegmentName, newSegmentName,
                   segmentAssignment, instancePartitionsMap);
@@ -1274,7 +1298,7 @@ public class PinotLLCRealtimeSegmentManager {
 
           // Smallest offset is fetched from stream once and cached in 
partitionIdToSmallestOffset.
           if (partitionIdToSmallestOffset == null) {
-            partitionIdToSmallestOffset = 
fetchPartitionGroupIdToSmallestOffset(streamConfig);
+            partitionIdToSmallestOffset = 
fetchPartitionGroupIdToSmallestOffset(streamConfigs);
           }
 
           // Do not create new CONSUMING segment when the stream partition has 
reached end of life.
@@ -1288,7 +1312,7 @@ public class PinotLLCRealtimeSegmentManager {
                 selectStartOffset(offsetCriteria, partitionId, 
partitionIdToStartOffset,
                     partitionIdToSmallestOffset, tableConfig.getTableName(), 
offsetFactory,
                     latestSegmentZKMetadata.getStartOffset()); // segments are 
OFFLINE; start from beginning
-            createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
+            createNewConsumingSegment(tableConfig, streamConfigs.get(0), 
latestSegmentZKMetadata, currentTimeMs,
                 partitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
                 instancePartitionsMap, startOffset);
           } else {
@@ -1297,7 +1321,7 @@ public class PinotLLCRealtimeSegmentManager {
                 selectStartOffset(offsetCriteria, partitionId, 
partitionIdToStartOffset,
                     partitionIdToSmallestOffset, tableConfig.getTableName(), 
offsetFactory,
                     latestSegmentZKMetadata.getEndOffset());
-            createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
+            createNewConsumingSegment(tableConfig, streamConfigs.get(0), 
latestSegmentZKMetadata, currentTimeMs,
                 partitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
                 instancePartitionsMap, startOffset);
           }
@@ -1344,7 +1368,8 @@ public class PinotLLCRealtimeSegmentManager {
       int partitionId = partitionGroupMetadata.getPartitionGroupId();
       if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
         String newSegmentName =
-            setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupMetadata, currentTimeMs, instancePartitions,
+            setupNewPartitionGroup(tableConfig, streamConfigs.get(0), 
partitionGroupMetadata, currentTimeMs,
+                instancePartitions,
                 numPartitions, numReplicas);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
newSegmentName, segmentAssignment,
             instancePartitionsMap);
@@ -1372,15 +1397,18 @@ public class PinotLLCRealtimeSegmentManager {
         instancePartitionsMap);
   }
 
-  private Map<Integer, StreamPartitionMsgOffset> 
fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
-    OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
-    streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
-    List<PartitionGroupMetadata> partitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, 
Collections.emptyList());
-    streamConfig.setOffsetCriteria(originalOffsetCriteria);
+  private Map<Integer, StreamPartitionMsgOffset> 
fetchPartitionGroupIdToSmallestOffset(
+      List<StreamConfig> streamConfigs) {
     Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = 
new HashMap<>();
-    for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
-      partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+    for (StreamConfig streamConfig : streamConfigs) {
+      OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+      streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+      List<PartitionGroupMetadata> partitionGroupMetadataList =
+          getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList());
+      streamConfig.setOffsetCriteria(originalOffsetCriteria);
+      for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
+        partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+      }
     }
     return partitionGroupIdToSmallestOffset;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 63d302f929..5bb3f861d7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -102,7 +102,7 @@ public class SegmentCompletionManager {
     String rawTableName = llcSegmentName.getTableName();
     TableConfig tableConfig = 
_segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
     StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
+        new StreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0));
     return 
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
   }
 
@@ -131,7 +131,7 @@ public class SegmentCompletionManager {
     TableConfig tableConfig = 
_segmentManager.getTableConfig(realtimeTableName);
     String factoryName = null;
     try {
-      Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+      Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0);
       factoryName = 
streamConfigMap.get(StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME);
     } catch (Exception e) {
       // If there is an exception, we default to the default factory.
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 88f1bc6ee6..dbe229ebc9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -104,14 +105,15 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
       return;
     }
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map(
+        streamConfig -> new StreamConfig(tableConfig.getTableName(), 
streamConfig)
+    ).collect(Collectors.toList());
     if (context._runSegmentLevelValidation) {
-      runSegmentLevelValidation(tableConfig, streamConfig);
+      runSegmentLevelValidation(tableConfig);
     }
 
     if (shouldEnsureConsuming(tableNameWithType)) {
-      _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig, context._offsetCriteria);
+      _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfigs, context._offsetCriteria);
     }
   }
 
@@ -147,7 +149,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     return !isQuotaExceeded;
   }
 
-  private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig 
streamConfig) {
+  private void runSegmentLevelValidation(TableConfig tableConfig) {
     String realtimeTableName = tableConfig.getTableName();
 
     List<SegmentZKMetadata> segmentsZKMetadata = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 42bc697c75..dbe640d364 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -91,8 +91,8 @@ import org.testng.annotations.Test;
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
 import static org.testng.Assert.*;
 
 
@@ -274,7 +274,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     // committing segment's partitionGroupId no longer in the 
newPartitionGroupMetadataList
     List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, 
Collections.emptyList());
+        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList());
     partitionGroupMetadataListWithout0.remove(0);
     segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout0;
 
@@ -595,7 +595,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
      */
     // 1 reached end of shard.
     List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, 
Collections.emptyList());
+        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList());
     partitionGroupMetadataListWithout1.remove(1);
     segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout1;
     // noop
@@ -882,7 +882,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       // Expected
     }
     try {
-      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, 
segmentManager._streamConfig, null);
+      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, 
segmentManager._streamConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       // Expected
@@ -1217,6 +1217,36 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertEquals(numDeletedTmpSegments, 1);
   }
 
+  @Test
+  public void testGetPartitionIds()
+      throws Exception {
+    List<StreamConfig> streamConfigs = 
List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs());
+    IdealState idealState = new IdealState("table");
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    segmentManager._numPartitions = 2;
+
+    // Test empty ideal state
+    Set<Integer> partitionIds = segmentManager.getPartitionIds(streamConfigs, 
idealState);
+    Assert.assertEquals(partitionIds.size(), 2);
+    partitionIds.clear();
+
+    // Simulate the case where getPartitionIds(StreamConfig) throws an 
exception (e.g. transient kafka connection issue)
+    PinotLLCRealtimeSegmentManager segmentManagerSpy = 
spy(FakePinotLLCRealtimeSegmentManager.class);
+    doThrow(new 
RuntimeException()).when(segmentManagerSpy).getPartitionIds(any(StreamConfig.class));
+    List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList =
+        List.of(new PartitionGroupConsumptionStatus(0, 12, new 
LongMsgOffset(123), new LongMsgOffset(234), "ONLINE"),
+            new PartitionGroupConsumptionStatus(1, 12, new LongMsgOffset(123), 
new LongMsgOffset(345), "ONLINE"));
+    doReturn(partitionGroupConsumptionStatusList).when(segmentManagerSpy)
+        .getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+    List<PartitionGroupMetadata> partitionGroupMetadataList =
+        List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)),
+            new PartitionGroupMetadata(1, new LongMsgOffset(345)));
+    doReturn(partitionGroupMetadataList).when(segmentManagerSpy)
+        .getNewPartitionGroupMetadataList(streamConfigs, 
partitionGroupConsumptionStatusList);
+    partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, 
idealState);
+    Assert.assertEquals(partitionIds.size(), 2);
+  }
+
   
//////////////////////////////////////////////////////////////////////////////////
   // Fake classes
   
/////////////////////////////////////////////////////////////////////////////////
@@ -1230,7 +1260,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     int _numReplicas;
     TableConfig _tableConfig;
-    StreamConfig _streamConfig;
+    List<StreamConfig> _streamConfigs;
     int _numInstances;
     InstancePartitions _consumingInstancePartitions;
     Map<String, SegmentZKMetadata> _segmentZKMetadataMap = new HashMap<>();
@@ -1258,8 +1288,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
       _tableConfig =
           new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas)
               .setStreamConfigs(streamConfigs).build();
-      _streamConfig =
-          new StreamConfig(_tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(_tableConfig));
+      _streamConfigs = 
IngestionConfigUtils.getStreamConfigMaps(_tableConfig).stream().map(
+          streamConfig -> new StreamConfig(_tableConfig.getTableName(), 
streamConfig)).collect(Collectors.toList());
     }
 
     void makeConsumingInstancePartitions() {
@@ -1277,8 +1307,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     public void ensureAllPartitionsConsuming() {
-      ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
-          getNewPartitionGroupMetadataList(_streamConfig, 
Collections.emptyList()), null);
+      ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState,
+          getNewPartitionGroupMetadataList(_streamConfigs, 
Collections.emptyList()), null);
     }
 
     @Override
@@ -1358,7 +1388,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
-    List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig 
streamConfig,
+    List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
         List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
       if (_partitionGroupMetadataList != null) {
         return _partitionGroupMetadataList;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index fff6232943..36caa5b86a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -73,7 +73,7 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
         }
         break;
       case REALTIME:
-        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0);
         if 
(Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE))
             && 
StringUtils.isEmpty(_instanceDataManagerConfig.getSegmentStoreUri())) {
           throw new IllegalStateException(String.format("Table has enabled %s 
config. But the server has not "
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 684e1ffa53..380b358a84 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -282,7 +282,14 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private static final int MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS = 31;
 
   private Thread _consumerThread;
+  // _partitionGroupId represents the Pinot's internal partition number which 
will eventually be used as part of
+  // segment name.
+  // _streamPatitionGroupId represents the partition number in the stream 
topic, which could be derived from the
+  // _partitionGroupId and identify which partition of the stream topic this 
consumer is consuming from.
+  // Note that in traditional single topic ingestion mode, those two concepts 
were identical which got separated
+  // in multi-topic ingestion mode.
   private final int _partitionGroupId;
+  private final int _streamPatitionGroupId;
   private final PartitionGroupConsumptionStatus 
_partitionGroupConsumptionStatus;
   final String _clientId;
   private final TransformPipeline _transformPipeline;
@@ -1496,12 +1503,16 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
-    _streamConfig = new StreamConfig(_tableNameWithType, 
IngestionConfigUtils.getStreamConfigMap(_tableConfig));
+    _partitionGroupId = llcSegmentName.getPartitionGroupId();
+    _streamPatitionGroupId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId);
+    _streamConfig = new StreamConfig(
+        _tableNameWithType,
+        IngestionConfigUtils.getStreamConfigMaps(_tableConfig)
+            
.get(IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId)));
     _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
     _streamPartitionMsgOffsetFactory = 
_streamConsumerFactory.createStreamMsgOffsetFactory();
     String streamTopic = _streamConfig.getTopicName();
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
-    _partitionGroupId = llcSegmentName.getPartitionGroupId();
     _partitionGroupConsumptionStatus =
         new PartitionGroupConsumptionStatus(_partitionGroupId, 
llcSegmentName.getSequenceNumber(),
             
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset()),
@@ -1514,9 +1525,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     String clientIdSuffix =
         instanceDataManagerConfig != null ? 
instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
     if (StringUtils.isNotBlank(clientIdSuffix)) {
-      _clientId = _tableNameWithType + "-" + streamTopic + "-" + 
_partitionGroupId + "-" + clientIdSuffix;
+      _clientId = _tableNameWithType + "-" + streamTopic + "-" + 
_streamPatitionGroupId + "-" + clientIdSuffix;
     } else {
-      _clientId = _tableNameWithType + "-" + streamTopic + "-" + 
_partitionGroupId;
+      _clientId = _tableNameWithType + "-" + streamTopic + "-" + 
_streamPatitionGroupId;
     }
     _segmentLogger = 
LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + 
_segmentNameStr);
     _tableStreamName = _tableNameWithType + "_" + streamTopic;
@@ -1832,7 +1843,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private void createPartitionMetadataProvider(String reason) {
     closePartitionMetadataProvider();
     _segmentLogger.info("Creating new partition metadata provider, reason: 
{}", reason);
-    _partitionMetadataProvider = 
_streamConsumerFactory.createPartitionMetadataProvider(_clientId, 
_partitionGroupId);
+    _partitionMetadataProvider = 
_streamConsumerFactory.createPartitionMetadataProvider(
+        _clientId, _streamPatitionGroupId);
   }
 
   private void updateIngestionMetrics(RowMetadata metadata) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 33a3b55654..4224019ab0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -47,7 +47,7 @@ public class SegmentCommitterFactory {
     _protocolHandler = protocolHandler;
     _tableConfig = tableConfig;
     _streamConfig = new StreamConfig(_tableConfig.getTableName(),
-        IngestionConfigUtils.getStreamConfigMap(_tableConfig));
+        IngestionConfigUtils.getStreamConfigMaps(_tableConfig).get(0));
     _indexLoadingConfig = indexLoadingConfig;
     _serverMetrics = serverMetrics;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 387f69a442..141e0c280a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -169,15 +169,22 @@ public final class TableConfigUtils {
 
       // Only allow realtime tables with non-null stream.type and LLC 
consumer.type
       if (tableConfig.getTableType() == TableType.REALTIME) {
-        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        List<Map<String, String>> streamConfigMaps = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig);
+        if (streamConfigMaps.size() > 1) {
+          Preconditions.checkArgument(!tableConfig.isUpsertEnabled(),
+              "Multiple stream configs are not supported for upsert tables");
+        }
+        // TODO: validate stream configs in the map are identical in most 
fields
         StreamConfig streamConfig;
-        try {
-          // Validate that StreamConfig can be created
-          streamConfig = new StreamConfig(tableConfig.getTableName(), 
streamConfigMap);
-        } catch (Exception e) {
-          throw new IllegalStateException("Could not create StreamConfig using 
the streamConfig map", e);
+        for (Map<String, String> streamConfigMap : streamConfigMaps) {
+          try {
+            // Validate that StreamConfig can be created
+            streamConfig = new StreamConfig(tableConfig.getTableName(), 
streamConfigMap);
+          } catch (Exception e) {
+            throw new IllegalStateException("Could not create StreamConfig 
using the streamConfig map", e);
+          }
+          validateStreamConfig(streamConfig);
         }
-        validateStreamConfig(streamConfig);
       }
       validateTierConfigList(tableConfig.getTierConfigsList());
       validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
@@ -390,7 +397,8 @@ public final class TableConfigUtils {
         Preconditions.checkState(indexingConfig == null || 
MapUtils.isEmpty(indexingConfig.getStreamConfigs()),
             "Should not use indexingConfig#getStreamConfigs if 
ingestionConfig#StreamIngestionConfig is provided");
         List<Map<String, String>> streamConfigMaps = 
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
-        Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream 
is supported in REALTIME table");
+        Preconditions.checkState(streamConfigMaps.size() > 0, "Must have at 
least 1 stream in REALTIME table");
+        // TODO: for multiple stream configs, validate them
       }
 
       // Filter config
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 98b0ba552c..72a17ee7d1 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -684,12 +684,11 @@ public class TableConfigUtilsTest {
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
             .setIngestionConfig(ingestionConfig).build();
 
-    // only 1 stream config allowed
+    // Multiple stream configs are allowed
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
-      Assert.fail("Should fail for more than 1 stream config");
     } catch (IllegalStateException e) {
-      // expected
+      Assert.fail("Multiple stream configs should be supported");
     }
 
     // stream config should be valid
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
index d519a23029..bc02df8462 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.spi.stream;
 
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
 /**
  * A PartitionGroup is a group of partitions/shards that the same consumer 
should consume from.
  * This class contains all information which describes the latest state of a 
partition group.
@@ -36,6 +39,7 @@ package org.apache.pinot.spi.stream;
 public class PartitionGroupConsumptionStatus {
 
   private final int _partitionGroupId;
+  private final int _streamPartitionGroupId;
   private int _sequenceNumber;
   private StreamPartitionMsgOffset _startOffset;
   private StreamPartitionMsgOffset _endOffset;
@@ -44,6 +48,7 @@ public class PartitionGroupConsumptionStatus {
   public PartitionGroupConsumptionStatus(int partitionGroupId, int 
sequenceNumber, StreamPartitionMsgOffset startOffset,
       StreamPartitionMsgOffset endOffset, String status) {
     _partitionGroupId = partitionGroupId;
+    _streamPartitionGroupId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId);
     _sequenceNumber = sequenceNumber;
     _startOffset = startOffset;
     _endOffset = endOffset;
@@ -54,6 +59,10 @@ public class PartitionGroupConsumptionStatus {
     return _partitionGroupId;
   }
 
+  public int getStreamPartitionGroupId() {
+    return _streamPartitionGroupId;
+  }
+
   public int getSequenceNumber() {
     return _sequenceNumber;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 98094b9e88..158e28ce72 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -18,33 +18,35 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Fetches the list of {@link PartitionGroupMetadata} for all partition groups 
of the stream,
+ * Fetches the list of {@link PartitionGroupMetadata} for all partition groups 
of the streams,
  * using the {@link StreamMetadataProvider}
  */
 public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
 
-  private List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
-  private final StreamConfig _streamConfig;
+  private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
+  private final List<StreamConfig> _streamConfigs;
   private final List<PartitionGroupConsumptionStatus> 
_partitionGroupConsumptionStatusList;
-  private final StreamConsumerFactory _streamConsumerFactory;
   private Exception _exception;
-  private final String _topicName;
+  private final List<String> _topicNames;
 
-  public PartitionGroupMetadataFetcher(StreamConfig streamConfig,
+  public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
       List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
-    _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
-    _topicName = streamConfig.getTopicName();
-    _streamConfig = streamConfig;
+    _topicNames = 
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList());
+    _streamConfigs = streamConfigs;
     _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
+    _newPartitionGroupMetadataList = new ArrayList<>();
   }
 
   public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
@@ -63,25 +65,43 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
   @Override
   public Boolean call()
       throws Exception {
-    String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-"
-            + _streamConfig.getTableNameWithType() + "-" + _topicName;
-    try (
-        StreamMetadataProvider streamMetadataProvider = 
_streamConsumerFactory.createStreamMetadataProvider(clientId)) {
-      _newPartitionGroupMetadataList = 
streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfig,
-          _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000);
-      if (_exception != null) {
-        // We had at least one failure, but succeeded now. Log an info
-        LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic 
{}", _topicName);
+    _newPartitionGroupMetadataList.clear();
+    for (int i = 0; i < _streamConfigs.size(); i++) {
+      String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + 
"-"
+          + _streamConfigs.get(i).getTableNameWithType() + "-" + 
_topicNames.get(i);
+      StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfigs.get(i));
+      final int index = i;
+      List<PartitionGroupConsumptionStatus> 
topicPartitionGroupConsumptionStatusList =
+          _partitionGroupConsumptionStatusList.stream()
+              .filter(partitionGroupConsumptionStatus ->
+                  
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+                      partitionGroupConsumptionStatus.getPartitionGroupId()) 
== index)
+              .collect(Collectors.toList());
+      try (
+          StreamMetadataProvider streamMetadataProvider =
+              streamConsumerFactory.createStreamMetadataProvider(clientId)) {
+        
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
+            _streamConfigs.get(i),
+            topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000).stream().map(
+            metadata -> new PartitionGroupMetadata(
+                IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
+                    metadata.getPartitionGroupId(), index),
+                metadata.getStartOffset())).collect(Collectors.toList())
+        );
+        if (_exception != null) {
+          // We had at least one failure, but succeeded now. Log an info
+          LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic 
{}", _topicNames.get(i));
+        }
+      } catch (TransientConsumerException e) {
+        LOGGER.warn("Transient Exception: Could not get partition count for 
topic {}", _topicNames.get(i), e);
+        _exception = e;
+        return Boolean.FALSE;
+      } catch (Exception e) {
+        LOGGER.warn("Could not get partition count for topic {}", 
_topicNames.get(i), e);
+        _exception = e;
+        throw e;
       }
-      return Boolean.TRUE;
-    } catch (TransientConsumerException e) {
-      LOGGER.warn("Transient Exception: Could not get partition count for 
topic {}", _topicName, e);
-      _exception = e;
-      return Boolean.FALSE;
-    } catch (Exception e) {
-      LOGGER.warn("Could not get partition count for topic {}", _topicName, e);
-      _exception = e;
-      throw e;
     }
+    return Boolean.TRUE;
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 39d061473e..e52610dd67 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -223,7 +223,7 @@ public class StreamConfig {
     return _serverUploadToDeepStore;
   }
 
-  private double extractFlushThresholdVarianceFraction(Map<String, String> 
streamConfigMap) {
+  public static double extractFlushThresholdVarianceFraction(Map<String, 
String> streamConfigMap) {
     String key = StreamConfigProperties.FLUSH_THRESHOLD_VARIANCE_FRACTION;
     String flushThresholdVarianceFractionStr = streamConfigMap.get(key);
     if (flushThresholdVarianceFractionStr != null) {
@@ -245,7 +245,7 @@ public class StreamConfig {
     }
   }
 
-  private long extractFlushThresholdSegmentSize(Map<String, String> 
streamConfigMap) {
+  public static long extractFlushThresholdSegmentSize(Map<String, String> 
streamConfigMap) {
     String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE;
     String flushThresholdSegmentSizeStr = streamConfigMap.get(key);
     if (flushThresholdSegmentSizeStr == null) {
@@ -264,7 +264,7 @@ public class StreamConfig {
     }
   }
 
-  protected int extractFlushThresholdRows(Map<String, String> streamConfigMap) 
{
+  public static int extractFlushThresholdRows(Map<String, String> 
streamConfigMap) {
     String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS;
     String flushThresholdRowsStr = streamConfigMap.get(key);
     if (flushThresholdRowsStr == null) {
@@ -288,7 +288,7 @@ public class StreamConfig {
     }
   }
 
-  protected int extractFlushThresholdSegmentRows(Map<String, String> 
streamConfigMap) {
+  public static int extractFlushThresholdSegmentRows(Map<String, String> 
streamConfigMap) {
     String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_ROWS;
     String flushThresholdSegmentRowsStr = streamConfigMap.get(key);
     if (flushThresholdSegmentRowsStr != null) {
@@ -302,7 +302,7 @@ public class StreamConfig {
     }
   }
 
-  protected long extractFlushThresholdTimeMillis(Map<String, String> 
streamConfigMap) {
+  public static long extractFlushThresholdTimeMillis(Map<String, String> 
streamConfigMap) {
     String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME;
     String flushThresholdTimeStr = streamConfigMap.get(key);
     if (flushThresholdTimeStr == null) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 812b7b8e0f..a8c4d22cc3 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -59,7 +59,7 @@ public abstract class StreamConsumerFactory {
    */
   public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
-    return createPartitionLevelConsumer(clientId, 
partitionGroupConsumptionStatus.getPartitionGroupId());
+    return createPartitionLevelConsumer(clientId, 
partitionGroupConsumptionStatus.getStreamPartitionGroupId());
   }
 
   @Deprecated
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 85bb2801a1..052993a6d0 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -81,7 +81,7 @@ public interface StreamMetadataProvider extends Closeable {
     // If partition group is still in progress, this value will be null
     for (PartitionGroupConsumptionStatus 
currentPartitionGroupConsumptionStatus : partitionGroupConsumptionStatuses) {
       newPartitionGroupMetadataList.add(
-          new 
PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getPartitionGroupId(),
+          new 
PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getStreamPartitionGroupId(),
               currentPartitionGroupConsumptionStatus.getEndOffset()));
     }
     // Add PartitionGroupMetadata for new partitions
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index 2aeba4160b..81e2d9655a 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.utils;
 
 import com.google.common.base.Preconditions;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +30,7 @@ import 
org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.stream.StreamConfig;
 
 
 /**
@@ -46,15 +48,100 @@ public final class IngestionConfigUtils {
   private static final int DEFAULT_PUSH_ATTEMPTS = 5;
   private static final int DEFAULT_PUSH_PARALLELISM = 1;
   private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
+  // For partition from different topics, we pad then with an offset to avoid 
collision. The offset is far higher
+  // than the normal max number of partitions on stream (e.g. 512).
+  public static final int PARTITION_PADDING_OFFSET = 10000;
+  public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
+      "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory";
+  public static final String STREAM_TYPE = "streamType";
+  public static final String STREAM_CONSUMER_FACTORY_CLASS = 
"stream.consumer.factory.class";
 
   /**
    * Fetches the streamConfig from the given realtime table.
    * First, the ingestionConfigs->stream->streamConfigs will be checked.
    * If not found, the indexingConfig->streamConfigs will be checked (which is 
deprecated).
    * @param tableConfig realtime table config
-   * @return streamConfigs map
+   * @return streamConfigs List of maps
    */
-  public static Map<String, String> getStreamConfigMap(TableConfig 
tableConfig) {
+  public static List<Map<String, String>> getStreamConfigMaps(TableConfig 
tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+        "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != 
null) {
+      List<Map<String, String>> streamConfigMaps =
+          
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+      Preconditions.checkState(!streamConfigMaps.isEmpty(), "Table must have 
at least 1 stream");
+      /*
+      Apply the following checks if there are multiple streamConfigs
+      1. Check if all streamConfigs have the same stream type. TODO: remove 
this limitation once we've tested it
+      2. Ensure segment flush parameters consistent across all streamConfigs. 
We need this because Pinot is predefining
+      the values before fetching stream partition info from stream. At the 
construction time, we don't know the value
+      extracted from a streamConfig would be applied to which segment.
+      TODO: remove this limitation once we've refactored the code and 
supported it.
+       */
+      Map<String, String> firstStreamConfigMap = streamConfigMaps.get(0);
+      for (int i = 1; i < streamConfigMaps.size(); i++) {
+        Map<String, String> map = streamConfigMaps.get(i);
+        Preconditions.checkNotNull(map.get(STREAM_TYPE),
+            "streamType must be defined for all streamConfigs for REALTIME 
table: %s", tableNameWithType);
+        Preconditions.checkState(StringUtils.equals(map.get(STREAM_TYPE), 
firstStreamConfigMap.get(STREAM_TYPE))
+                && StreamConfig.extractFlushThresholdRows(map) == 
StreamConfig.extractFlushThresholdRows(
+            firstStreamConfigMap)
+                && StreamConfig.extractFlushThresholdTimeMillis(map) == 
StreamConfig.extractFlushThresholdTimeMillis(
+            firstStreamConfigMap)
+                && StreamConfig.extractFlushThresholdVarianceFraction(map)
+                == 
StreamConfig.extractFlushThresholdVarianceFraction(firstStreamConfigMap)
+                && StreamConfig.extractFlushThresholdSegmentSize(map) == 
StreamConfig.extractFlushThresholdSegmentSize(
+            firstStreamConfigMap)
+                && StreamConfig.extractFlushThresholdSegmentRows(map) == 
StreamConfig.extractFlushThresholdSegmentRows(
+            firstStreamConfigMap),
+            "All streamConfigs must have the same stream type for REALTIME 
table: %s", tableNameWithType);
+      }
+      return streamConfigMaps;
+    }
+    if (tableConfig.getIndexingConfig() != null && 
tableConfig.getIndexingConfig().getStreamConfigs() != null) {
+      return Arrays.asList(tableConfig.getIndexingConfig().getStreamConfigs());
+    }
+    throw new IllegalStateException("Could not find streamConfigs for REALTIME 
table: " + tableNameWithType);
+  }
+
+  /**
+   * Getting the Pinot segment level partition id from the stream partition id.
+   * @param partitionId the partition group id from the stream
+   * @param index the index of the SteamConfig from the list of StreamConfigs
+   * @return
+   */
+  public static int getPinotPartitionIdFromStreamPartitionId(int partitionId, 
int index) {
+    return index * PARTITION_PADDING_OFFSET + partitionId;
+  }
+
+  /**
+   * Getting the Stream partition id from the Pinot segment partition id.
+   * @param partitionId the segment partition group id on Pinot
+   * @return
+   */
+  public static int getStreamPartitionIdFromPinotPartitionId(int partitionId) {
+    return partitionId % PARTITION_PADDING_OFFSET;
+  }
+
+  /**
+   * Getting the StreamConfig index of StreamConfigs list from the Pinot 
segment partition id.
+   * @param partitionId the segment partition group id on Pinot
+   * @return
+   */
+  public static int getStreamConfigIndexFromPinotPartitionId(int partitionId) {
+    return partitionId / PARTITION_PADDING_OFFSET;
+  }
+
+  /**
+   * Fetches the streamConfig from the list of streamConfigs according to the 
partitonGroupId.
+   * @param tableConfig realtime table config
+   * @param partitionGroupId partitionGroupId
+   * @return streamConfig map
+   */
+  public static Map<String, String> getStreamConfigMapWithPartitionGroupId(
+      TableConfig tableConfig, int partitionGroupId) {
     String tableNameWithType = tableConfig.getTableName();
     Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
         "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
@@ -63,10 +150,13 @@ public final class IngestionConfigUtils {
         && tableConfig.getIngestionConfig().getStreamIngestionConfig() != 
null) {
       List<Map<String, String>> streamConfigMaps =
           
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
-      Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream 
supported per table");
-      streamConfigMap = streamConfigMaps.get(0);
+      Preconditions.checkState(
+          streamConfigMaps.size() > partitionGroupId / 
PARTITION_PADDING_OFFSET,
+          "Table does not have enough number of stream");
+      streamConfigMap = streamConfigMaps.get(partitionGroupId / 
PARTITION_PADDING_OFFSET);
     }
-    if (streamConfigMap == null && tableConfig.getIndexingConfig() != null) {
+    if (partitionGroupId < PARTITION_PADDING_OFFSET
+        && streamConfigMap == null && tableConfig.getIndexingConfig() != null) 
{
       streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
     }
     if (streamConfigMap == null) {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
index b2b4c87b29..1e9517a330 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
@@ -44,7 +45,9 @@ public class IngestionConfigUtilsTest {
   public void testGetStreamConfigMap() {
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
     try {
-      IngestionConfigUtils.getStreamConfigMap(tableConfig);
+      IngestionConfigUtils.getStreamConfigMaps(tableConfig);
+      Assert.fail("Should fail for OFFLINE table");
+      IngestionConfigUtils.getStreamConfigMaps(tableConfig);
       Assert.fail("Should fail for OFFLINE table");
     } catch (IllegalStateException e) {
       // expected
@@ -58,7 +61,7 @@ public class IngestionConfigUtilsTest {
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(Collections.singletonList(streamConfigMap)));
     tableConfig.setIngestionConfig(ingestionConfig);
-    Map<String, String> actualStreamConfigsMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+    Map<String, String> actualStreamConfigsMap = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0);
     Assert.assertEquals(actualStreamConfigsMap.size(), 1);
     Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka");
 
@@ -69,30 +72,30 @@ public class IngestionConfigUtilsTest {
     IndexingConfig indexingConfig = new IndexingConfig();
     indexingConfig.setStreamConfigs(deprecatedStreamConfigMap);
     tableConfig.setIndexingConfig(indexingConfig);
-    actualStreamConfigsMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+    actualStreamConfigsMap = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0);
     Assert.assertEquals(actualStreamConfigsMap.size(), 1);
     Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka");
 
-    // fail if multiple found
+    // Able to get multiple stream configs
     ingestionConfig.setStreamIngestionConfig(
         new StreamIngestionConfig(Arrays.asList(streamConfigMap, 
deprecatedStreamConfigMap)));
     try {
-      IngestionConfigUtils.getStreamConfigMap(tableConfig);
-      Assert.fail("Should fail for multiple stream configs");
+      List<Map<String, String>> streamConfigs = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig);
+      Assert.assertEquals(streamConfigs.size(), 2);
     } catch (IllegalStateException e) {
       // expected
     }
 
     // get from indexing config
     tableConfig.setIngestionConfig(null);
-    actualStreamConfigsMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+    actualStreamConfigsMap = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0);
     Assert.assertEquals(actualStreamConfigsMap.size(), 2);
     Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "foo");
 
     // fail if found nowhere
     tableConfig.setIndexingConfig(new IndexingConfig());
     try {
-      IngestionConfigUtils.getStreamConfigMap(tableConfig);
+      IngestionConfigUtils.getStreamConfigMaps(tableConfig);
       Assert.fail("Should fail for no stream config found");
     } catch (IllegalStateException e) {
       // expected


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to