lnbest0707-uber commented on code in PR #13790:
URL: https://github.com/apache/pinot/pull/13790#discussion_r1853045190


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String> 
getStreamConfigMap(TableConfig tableConfig) {
     return streamConfigMap;
   }
 
+  /**
+   * Fetches the streamConfig from the given realtime table.
+   * First, the ingestionConfigs->stream->streamConfigs will be checked.
+   * If not found, the indexingConfig->streamConfigs will be checked (which is 
deprecated).
+   * @param tableConfig realtime table config
+   * @return streamConfigs List of maps
+   */
+  public static List<Map<String, String>> getStreamConfigMaps(TableConfig 
tableConfig) {

Review Comment:
   Removed the old one.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String> 
getStreamConfigMap(TableConfig tableConfig) {
     return streamConfigMap;
   }
 
+  /**
+   * Fetches the streamConfig from the given realtime table.
+   * First, the ingestionConfigs->stream->streamConfigs will be checked.
+   * If not found, the indexingConfig->streamConfigs will be checked (which is 
deprecated).
+   * @param tableConfig realtime table config
+   * @return streamConfigs List of maps
+   */
+  public static List<Map<String, String>> getStreamConfigMaps(TableConfig 
tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+        "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != 
null) {
+      List<Map<String, String>> streamConfigMaps =
+          
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+      Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have 
at least 1 stream");

Review Comment:
   Fixed



##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -569,10 +569,13 @@ protected void configure() {
     _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
       TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
       if (tableConfig != null) {
-        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        List<Map<String, String>> streamConfigMaps = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig);
         try {
-          
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE,
 "kafka"),
-              streamConfigMap);
+          for (Map<String, String> streamConfigMap : streamConfigMaps) {
+            StreamConfig.validateConsumerType(
+                
streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"),

Review Comment:
   Not sure how this works, but the mvn check style could pass.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java:
##########
@@ -99,4 +99,28 @@ public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamC
       throw new RuntimeException(fetcherException);
     }
   }
+
+  /**
+   * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
+   * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
+   * In particular, this method is used to fetch from multiple stream topics.
+   * @param streamConfigs
+   * @param partitionGroupConsumptionStatusList
+   * @return
+   */
+  public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,

Review Comment:
   Removed



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java:
##########
@@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String 
realtimeTableName, ZkHelixPropertySt
     }
   }
 
+  public MissingConsumingSegmentFinder(String realtimeTableName, 
ZkHelixPropertyStore<ZNRecord> propertyStore,

Review Comment:
   Removed



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java:
##########
@@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String 
realtimeTableName, ZkHelixPropertySt
     }
   }
 
+  public MissingConsumingSegmentFinder(String realtimeTableName, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
+      ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) {
+    _realtimeTableName = realtimeTableName;
+    _controllerMetrics = controllerMetrics;
+    _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, 
controllerMetrics);
+    _streamPartitionMsgOffsetFactory =
+        
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();

Review Comment:
   Will add a enforcement check when we fetch the streamConfigs to enforce them 
to be same for now.
   In long term, we need to redefine the structure of streamConfig for the 
usage.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -18,33 +18,44 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Fetches the list of {@link PartitionGroupMetadata} for all partition groups 
of the stream,
+ * Fetches the list of {@link PartitionGroupMetadata} for all partition groups 
of the streams,
  * using the {@link StreamMetadataProvider}
  */
 public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
 
   private List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
-  private final StreamConfig _streamConfig;
+  private final List<StreamConfig> _streamConfigs;
   private final List<PartitionGroupConsumptionStatus> 
_partitionGroupConsumptionStatusList;
-  private final StreamConsumerFactory _streamConsumerFactory;
   private Exception _exception;
-  private final String _topicName;
+  private final List<String> _topicNames;
+
+  public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
+    _topicNames = 
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList());
+    _streamConfigs = streamConfigs;
+    _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
+    _newPartitionGroupMetadataList = new ArrayList<>();
+  }
 
   public PartitionGroupMetadataFetcher(StreamConfig streamConfig,

Review Comment:
   removed



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String> 
getStreamConfigMap(TableConfig tableConfig) {
     return streamConfigMap;
   }
 
+  /**
+   * Fetches the streamConfig from the given realtime table.
+   * First, the ingestionConfigs->stream->streamConfigs will be checked.
+   * If not found, the indexingConfig->streamConfigs will be checked (which is 
deprecated).
+   * @param tableConfig realtime table config
+   * @return streamConfigs List of maps
+   */
+  public static List<Map<String, String>> getStreamConfigMaps(TableConfig 
tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+        "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != 
null) {
+      List<Map<String, String>> streamConfigMaps =
+          
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+      Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have 
at least 1 stream");
+      // For now, with multiple topics, we only support same type of stream 
(e.g. Kafka)

Review Comment:
   Added detailed  explanations. Basically it is due to our resources not able 
to cover the testing of other stream types.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String> 
getStreamConfigMap(TableConfig tableConfig) {
     return streamConfigMap;
   }
 
+  /**
+   * Fetches the streamConfig from the given realtime table.
+   * First, the ingestionConfigs->stream->streamConfigs will be checked.
+   * If not found, the indexingConfig->streamConfigs will be checked (which is 
deprecated).
+   * @param tableConfig realtime table config
+   * @return streamConfigs List of maps
+   */
+  public static List<Map<String, String>> getStreamConfigMaps(TableConfig 
tableConfig) {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to