Jackie-Jiang commented on code in PR #13790: URL: https://github.com/apache/pinot/pull/13790#discussion_r1849147395
########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java: ########## @@ -75,6 +84,103 @@ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig) { return streamConfigMap; } + /** + * Fetches the streamConfig from the given realtime table. + * First, the ingestionConfigs->stream->streamConfigs will be checked. + * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). + * @param tableConfig realtime table config + * @return streamConfigs List of maps + */ + public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) { Review Comment: Add deprecated annotation to the old `getStreamConfigMap()` if we are not removing it ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java: ########## @@ -75,6 +84,103 @@ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig) { return streamConfigMap; } + /** + * Fetches the streamConfig from the given realtime table. + * First, the ingestionConfigs->stream->streamConfigs will be checked. + * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). + * @param tableConfig realtime table config + * @return streamConfigs List of maps + */ + public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { + List<Map<String, String>> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have at least 1 stream"); + // For now, with multiple topics, we only support same type of stream (e.g. Kafka) Review Comment: What is the reason for this limitation? Some comments explaining this would be good. Only apply this check when there are multiple streams to match the current behavior ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java: ########## @@ -75,6 +84,103 @@ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig) { return streamConfigMap; } + /** + * Fetches the streamConfig from the given realtime table. + * First, the ingestionConfigs->stream->streamConfigs will be checked. + * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). + * @param tableConfig realtime table config + * @return streamConfigs List of maps + */ + public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { + List<Map<String, String>> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have at least 1 stream"); Review Comment: (nit) ```suggestion Preconditions.checkState(!streamConfigMaps.isEmpty(), "Table must have at least 1 stream"); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java: ########## @@ -99,4 +99,28 @@ public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamC throw new RuntimeException(fetcherException); } } + + /** + * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, + * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. + * In particular, this method is used to fetch from multiple stream topics. + * @param streamConfigs + * @param partitionGroupConsumptionStatusList + * @return + */ + public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(List<StreamConfig> streamConfigs, Review Comment: Deprecate the old method or remove it. Please also clean up all usages of the old one ########## pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java: ########## @@ -569,10 +569,13 @@ protected void configure() { _helixResourceManager.getAllRealtimeTables().forEach(rt -> { TableConfig tableConfig = _helixResourceManager.getTableConfig(rt); if (tableConfig != null) { - Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); try { - StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), - streamConfigMap); + for (Map<String, String> streamConfigMap : streamConfigMaps) { + StreamConfig.validateConsumerType( + streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), Review Comment: (format) Not comply to [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org