Jackie-Jiang commented on code in PR #13790:
URL: https://github.com/apache/pinot/pull/13790#discussion_r1849147395


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String> 
getStreamConfigMap(TableConfig tableConfig) {
     return streamConfigMap;
   }
 
+  /**
+   * Fetches the streamConfig from the given realtime table.
+   * First, the ingestionConfigs->stream->streamConfigs will be checked.
+   * If not found, the indexingConfig->streamConfigs will be checked (which is 
deprecated).
+   * @param tableConfig realtime table config
+   * @return streamConfigs List of maps
+   */
+  public static List<Map<String, String>> getStreamConfigMaps(TableConfig 
tableConfig) {

Review Comment:
   Add deprecated annotation to the old `getStreamConfigMap()` if we are not 
removing it



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String> 
getStreamConfigMap(TableConfig tableConfig) {
     return streamConfigMap;
   }
 
+  /**
+   * Fetches the streamConfig from the given realtime table.
+   * First, the ingestionConfigs->stream->streamConfigs will be checked.
+   * If not found, the indexingConfig->streamConfigs will be checked (which is 
deprecated).
+   * @param tableConfig realtime table config
+   * @return streamConfigs List of maps
+   */
+  public static List<Map<String, String>> getStreamConfigMaps(TableConfig 
tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+        "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != 
null) {
+      List<Map<String, String>> streamConfigMaps =
+          
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+      Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have 
at least 1 stream");
+      // For now, with multiple topics, we only support same type of stream 
(e.g. Kafka)

Review Comment:
   What is the reason for this limitation? Some comments explaining this would 
be good.
   Only apply this check when there are multiple streams to match the current 
behavior



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String> 
getStreamConfigMap(TableConfig tableConfig) {
     return streamConfigMap;
   }
 
+  /**
+   * Fetches the streamConfig from the given realtime table.
+   * First, the ingestionConfigs->stream->streamConfigs will be checked.
+   * If not found, the indexingConfig->streamConfigs will be checked (which is 
deprecated).
+   * @param tableConfig realtime table config
+   * @return streamConfigs List of maps
+   */
+  public static List<Map<String, String>> getStreamConfigMaps(TableConfig 
tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+        "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != 
null) {
+      List<Map<String, String>> streamConfigMaps =
+          
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+      Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have 
at least 1 stream");

Review Comment:
   (nit)
   ```suggestion
         Preconditions.checkState(!streamConfigMaps.isEmpty(), "Table must have 
at least 1 stream");
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java:
##########
@@ -99,4 +99,28 @@ public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamC
       throw new RuntimeException(fetcherException);
     }
   }
+
+  /**
+   * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
+   * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
+   * In particular, this method is used to fetch from multiple stream topics.
+   * @param streamConfigs
+   * @param partitionGroupConsumptionStatusList
+   * @return
+   */
+  public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,

Review Comment:
   Deprecate the old method or remove it. Please also clean up all usages of 
the old one



##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -569,10 +569,13 @@ protected void configure() {
     _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
       TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
       if (tableConfig != null) {
-        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        List<Map<String, String>> streamConfigMaps = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig);
         try {
-          
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE,
 "kafka"),
-              streamConfigMap);
+          for (Map<String, String> streamConfigMap : streamConfigMaps) {
+            StreamConfig.validateConsumerType(
+                
streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"),

Review Comment:
   (format) Not comply to [Pinot 
Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to