itschrispeck commented on code in PR #13790: URL: https://github.com/apache/pinot/pull/13790#discussion_r1849154551
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1294,7 +1315,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset()); - createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, + createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs, Review Comment: Can we use the partitionId to choose the correct streamConfig? Or we'd need to document that segment flush settings are only used from the first streamConfig in the table config's list (though I feel different flush settings per stream will eventually be a future requirement) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -929,15 +949,16 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = offsetsHaveToChange ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfig); - OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + // FIXME: Right now, we assume topics are sharing same offset criteria Review Comment: Does it make sense to add a precondition to check this? ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java: ########## @@ -75,6 +84,103 @@ public static Map<String, String> getStreamConfigMap(TableConfig tableConfig) { return streamConfigMap; } + /** + * Fetches the streamConfig from the given realtime table. + * First, the ingestionConfigs->stream->streamConfigs will be checked. + * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). + * @param tableConfig realtime table config + * @return streamConfigs List of maps + */ + public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) { Review Comment: Can we remove the old method if it is no longer used? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -255,12 +255,12 @@ public List<PartitionGroupConsumptionStatus> getPartitionGroupConsumptionStatusL // Create a {@link PartitionGroupConsumptionStatus} for each latest segment StreamPartitionMsgOffsetFactory offsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) { int partitionGroupId = entry.getKey(); LLCSegmentName llcSegmentName = entry.getValue(); SegmentZKMetadata segmentZKMetadata = - getSegmentZKMetadata(streamConfig.getTableNameWithType(), llcSegmentName.getSegmentName()); + getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(), llcSegmentName.getSegmentName()); Review Comment: nit: `idealState.getId()` instead of `.get(0)`? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java: ########## @@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt } } + public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore, + ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) { + _realtimeTableName = realtimeTableName; + _controllerMetrics = controllerMetrics; + _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); + _streamPartitionMsgOffsetFactory = + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); Review Comment: I think this breaks when mixing streams that do not use the same offset factory type, e.g. kinesis and kafka. (there's a lot of this specific case for offset factory, won't mark all) We could UT, or shall we add a TODO for them since we can't easily test e2e internally? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java: ########## @@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt } } + public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore, Review Comment: The old constructor is no longer used, can we remove it and update the tests? ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java: ########## @@ -18,33 +18,44 @@ */ package org.apache.pinot.spi.stream; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; +import java.util.stream.Collectors; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the stream, + * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the streams, * using the {@link StreamMetadataProvider} */ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); private List<PartitionGroupMetadata> _newPartitionGroupMetadataList; - private final StreamConfig _streamConfig; + private final List<StreamConfig> _streamConfigs; private final List<PartitionGroupConsumptionStatus> _partitionGroupConsumptionStatusList; - private final StreamConsumerFactory _streamConsumerFactory; private Exception _exception; - private final String _topicName; + private final List<String> _topicNames; + + public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { + _topicNames = streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList()); + _streamConfigs = streamConfigs; + _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; + _newPartitionGroupMetadataList = new ArrayList<>(); + } public PartitionGroupMetadataFetcher(StreamConfig streamConfig, Review Comment: Similar here, let's remove the unused constructor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org