lnbest0707-uber commented on code in PR #13790: URL: https://github.com/apache/pinot/pull/13790#discussion_r1853045190
########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java: ########## @@ -75,6 +84,103 @@ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig) { return streamConfigMap; } + /** + * Fetches the streamConfig from the given realtime table. + * First, the ingestionConfigs->stream->streamConfigs will be checked. + * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). + * @param tableConfig realtime table config + * @return streamConfigs List of maps + */ + public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) { Review Comment: Removed the old one. ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java: ########## @@ -75,6 +84,103 @@ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig) { return streamConfigMap; } + /** + * Fetches the streamConfig from the given realtime table. + * First, the ingestionConfigs->stream->streamConfigs will be checked. + * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). + * @param tableConfig realtime table config + * @return streamConfigs List of maps + */ + public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { + List<Map<String, String>> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have at least 1 stream"); Review Comment: Fixed ########## pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java: ########## @@ -569,10 +569,13 @@ protected void configure() { _helixResourceManager.getAllRealtimeTables().forEach(rt -> { TableConfig tableConfig = _helixResourceManager.getTableConfig(rt); if (tableConfig != null) { - Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); try { - StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), - streamConfigMap); + for (Map<String, String> streamConfigMap : streamConfigMaps) { + StreamConfig.validateConsumerType( + streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), Review Comment: Not sure how this works, but the mvn check style could pass. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java: ########## @@ -99,4 +99,28 @@ public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamC throw new RuntimeException(fetcherException); } } + + /** + * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, + * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. + * In particular, this method is used to fetch from multiple stream topics. + * @param streamConfigs + * @param partitionGroupConsumptionStatusList + * @return + */ + public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(List<StreamConfig> streamConfigs, Review Comment: Removed ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java: ########## @@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt } } + public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore, Review Comment: Removed ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java: ########## @@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt } } + public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore, + ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) { + _realtimeTableName = realtimeTableName; + _controllerMetrics = controllerMetrics; + _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); + _streamPartitionMsgOffsetFactory = + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); Review Comment: Will add a enforcement check when we fetch the streamConfigs to enforce them to be same for now. In long term, we need to redefine the structure of streamConfig for the usage. ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java: ########## @@ -18,33 +18,44 @@ */ package org.apache.pinot.spi.stream; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; +import java.util.stream.Collectors; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the stream, + * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the streams, * using the {@link StreamMetadataProvider} */ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); private List<PartitionGroupMetadata> _newPartitionGroupMetadataList; - private final StreamConfig _streamConfig; + private final List<StreamConfig> _streamConfigs; private final List<PartitionGroupConsumptionStatus> _partitionGroupConsumptionStatusList; - private final StreamConsumerFactory _streamConsumerFactory; private Exception _exception; - private final String _topicName; + private final List<String> _topicNames; + + public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { + _topicNames = streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList()); + _streamConfigs = streamConfigs; + _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; + _newPartitionGroupMetadataList = new ArrayList<>(); + } public PartitionGroupMetadataFetcher(StreamConfig streamConfig, Review Comment: removed ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java: ########## @@ -75,6 +84,103 @@ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig) { return streamConfigMap; } + /** + * Fetches the streamConfig from the given realtime table. + * First, the ingestionConfigs->stream->streamConfigs will be checked. + * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). + * @param tableConfig realtime table config + * @return streamConfigs List of maps + */ + public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { + List<Map<String, String>> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have at least 1 stream"); + // For now, with multiple topics, we only support same type of stream (e.g. Kafka) Review Comment: Added detailed explanations. Basically it is due to our resources not able to cover the testing of other stream types. ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java: ########## @@ -75,6 +84,103 @@ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig) { return streamConfigMap; } + /** + * Fetches the streamConfig from the given realtime table. + * First, the ingestionConfigs->stream->streamConfigs will be checked. + * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). + * @param tableConfig realtime table config + * @return streamConfigs List of maps + */ + public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) { Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org