This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 73abb21c23 Add multi stream ingestion support (#13790) 73abb21c23 is described below commit 73abb21c23d01d62f26e8f2a1eb8fddcd0f75aa2 Author: lnbest0707 <106711887+lnbest0707-u...@users.noreply.github.com> AuthorDate: Thu Dec 19 10:39:38 2024 -0800 Add multi stream ingestion support (#13790) * Add multi stream ingestion support * Fix UT * Fix issues, rebase and resolve comments * Resolve comments * Fix style * Ensure transient exceptions do not prevent creating new consuming segments Summary: Ensure transient exceptions do not prevent creating new consuming segments. If some exception is hit, attempt to reconcile any successful fetches with partition group metadata. This ensures consuming partitions are not dropped, and attempts to add and new partitions discovered successfully. Test Plan: After deployment, despite still some `TransientConsumerException`, no new missing consuming segments appear {F1002071843} {F1002071523} Reviewers: gaoxin, tingchen Reviewed By: gaoxin JIRA Issues: EVA-8951 Differential Revision: https://code.uberinternal.com/D15748639 * Resolve comments for optimizing java doc * Edit doc/comment * Remove unrelated files * Rebase and resolve conflicts * Take the metadata fetch time change from the HEAD * Resolve conflicts --------- Co-authored-by: Christopher Peck <p...@uber.com> --- .../pinot/controller/BaseControllerStarter.java | 12 +- .../controller/helix/SegmentStatusChecker.java | 8 +- .../helix/core/PinotTableIdealStateBuilder.java | 12 +- .../realtime/MissingConsumingSegmentFinder.java | 18 ++- .../realtime/PinotLLCRealtimeSegmentManager.java | 132 +++++++++++++-------- .../core/realtime/SegmentCompletionManager.java | 4 +- .../RealtimeSegmentValidationManager.java | 12 +- .../PinotLLCRealtimeSegmentManagerTest.java | 52 ++++++-- .../provider/DefaultTableDataManagerProvider.java | 2 +- .../realtime/RealtimeSegmentDataManager.java | 22 +++- .../manager/realtime/SegmentCommitterFactory.java | 2 +- .../segment/local/utils/TableConfigUtils.java | 24 ++-- .../segment/local/utils/TableConfigUtilsTest.java | 5 +- .../stream/PartitionGroupConsumptionStatus.java | 9 ++ .../spi/stream/PartitionGroupMetadataFetcher.java | 74 +++++++----- .../org/apache/pinot/spi/stream/StreamConfig.java | 10 +- .../pinot/spi/stream/StreamConsumerFactory.java | 2 +- .../pinot/spi/stream/StreamMetadataProvider.java | 2 +- .../pinot/spi/utils/IngestionConfigUtils.java | 100 +++++++++++++++- .../pinot/spi/utils/IngestionConfigUtilsTest.java | 19 +-- 20 files changed, 367 insertions(+), 154 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 342413d355..0326f97a7b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -257,7 +257,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { // This executor service is used to do async tasks from multiget util or table rebalancing. _executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d"); _tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(), - "tenant-rebalance-thread-%d"); + "tenant-rebalance-thread-%d"); _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService); } @@ -272,7 +272,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { private ExecutorService createExecutorService(int numThreadPool, String threadNameFormat) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(); return (numThreadPool <= 0) ? Executors.newCachedThreadPool(threadFactory) - : Executors.newFixedThreadPool(numThreadPool, threadFactory); + : Executors.newFixedThreadPool(numThreadPool, threadFactory); } private void inferHostnameIfNeeded(ControllerConf config) { @@ -577,10 +577,12 @@ public abstract class BaseControllerStarter implements ServiceStartable { _helixResourceManager.getAllRealtimeTables().forEach(rt -> { TableConfig tableConfig = _helixResourceManager.getTableConfig(rt); if (tableConfig != null) { - Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); try { - StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), - streamConfigMap); + for (Map<String, String> streamConfigMap : streamConfigMaps) { + StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), + streamConfigMap); + } } catch (Exception e) { existingHlcTables.add(rt); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index c9a48022c0..1a5f542dd7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -419,10 +420,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh numInvalidEndTime); if (tableType == TableType.REALTIME && tableConfig != null) { - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics, - streamConfig).findAndEmitMetrics(idealState); + streamConfigs).findAndEmitMetrics(idealState); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 23a115417f..8895d9df50 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -54,6 +54,7 @@ public class PinotTableIdealStateBuilder { /** * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. + * In particular, this method can also be used to fetch from multiple stream topics. * * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed: * @@ -79,23 +80,24 @@ public class PinotTableIdealStateBuilder { * the collection of shards in partition group 1, should remain unchanged in the response, * whereas shards 3,4 can be added to new partition groups if needed. * - * @param streamConfig the streamConfig from the tableConfig + * @param streamConfigs the List of streamConfig from the tableConfig * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current * partition groups. * The size of this list is equal to the number of partition groups, * and is created using the latest segment zk metadata. */ - public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(List<StreamConfig> streamConfigs, List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = - new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); + new PartitionGroupMetadataFetcher(streamConfigs, partitionGroupConsumptionStatusList); try { DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); } catch (Exception e) { Exception fetcherException = partitionGroupMetadataFetcher.getException(); - LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(), - streamConfig.getTableNameWithType(), fetcherException); + LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", + streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).reduce((a, b) -> a + "," + b), + streamConfigs.get(0).getTableNameWithType(), fetcherException); throw new RuntimeException(fetcherException); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index f4192a5a1a..5fe2ffe6d6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -24,7 +24,9 @@ import java.time.Duration; import java.time.Instant; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.helix.AccessOption; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -65,25 +67,29 @@ public class MissingConsumingSegmentFinder { private ControllerMetrics _controllerMetrics; public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore, - ControllerMetrics controllerMetrics, StreamConfig streamConfig) { + ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) { _realtimeTableName = realtimeTableName; _controllerMetrics = controllerMetrics; _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); _streamPartitionMsgOffsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); // create partition group id to largest stream offset map _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>(); - streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); + streamConfigs.stream().map(streamConfig -> { + streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); + return streamConfig; + }); try { - PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) + PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList()) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); } catch (Exception e) { - LOGGER.warn("Problem encountered in fetching stream metadata for topic: {} of table: {}. " + LOGGER.warn("Problem encountered in fetching stream metadata for topics: {} of table: {}. " + "Continue finding missing consuming segment only with ideal state information.", - streamConfig.getTopicName(), streamConfig.getTableNameWithType()); + streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).collect(Collectors.toList()), + streamConfigs.get(0).getTableNameWithType()); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 44ca01812a..4ba7cd2208 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -232,7 +232,7 @@ public class PinotLLCRealtimeSegmentManager { * for latest segment of each partition group. */ public List<PartitionGroupConsumptionStatus> getPartitionGroupConsumptionStatusList(IdealState idealState, - StreamConfig streamConfig) { + List<StreamConfig> streamConfigs) { List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList = new ArrayList<>(); // From all segment names in the ideal state, find unique partition group ids and their latest segment @@ -257,12 +257,12 @@ public class PinotLLCRealtimeSegmentManager { // Create a {@link PartitionGroupConsumptionStatus} for each latest segment StreamPartitionMsgOffsetFactory offsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) { int partitionGroupId = entry.getKey(); LLCSegmentName llcSegmentName = entry.getValue(); SegmentZKMetadata segmentZKMetadata = - getSegmentZKMetadata(streamConfig.getTableNameWithType(), llcSegmentName.getSegmentName()); + getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(), llcSegmentName.getSegmentName()); PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(partitionGroupId, llcSegmentName.getSequenceNumber(), offsetFactory.create(segmentZKMetadata.getStartOffset()), @@ -322,11 +322,12 @@ public class PinotLLCRealtimeSegmentManager { _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName); - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); + getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList()); int numPartitionGroups = newPartitionGroupMetadataList.size(); int numReplicas = getNumReplicas(tableConfig, instancePartitions); @@ -339,7 +340,8 @@ public class PinotLLCRealtimeSegmentManager { Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { String segmentName = - setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, + setupNewPartitionGroup(tableConfig, streamConfigs.get(0), partitionGroupMetadata, currentTimeMs, + instancePartitions, numPartitionGroups, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, instancePartitionsMap); @@ -548,29 +550,16 @@ public class PinotLLCRealtimeSegmentManager { long startTimeNs2 = System.nanoTime(); String newConsumingSegmentName = null; if (!isTablePaused(idealState)) { - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); - Set<Integer> partitionIds; - try { - partitionIds = getPartitionIds(streamConfig); - } catch (Exception e) { - LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. " - + "Reading all partition group metadata to determine partition ids.", realtimeTableName, e.toString()); - // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed. - // We don't need to read partition group metadata for other partition groups. - List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = - getPartitionGroupConsumptionStatusList(idealState, streamConfig); - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); - partitionIds = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId) - .collect(Collectors.toSet()); - } + List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); + Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState); if (partitionIds.contains(committingSegmentPartitionGroupId)) { String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, + createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegment, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(), numReplicas); newConsumingSegmentName = newLLCSegment.getSegmentName(); @@ -764,7 +753,7 @@ public class PinotLLCRealtimeSegmentManager { return commitTimeoutMS; } TableConfig tableConfig = getTableConfig(realtimeTableName); - final Map<String, String> streamConfigs = IngestionConfigUtils.getStreamConfigMap(tableConfig); + final Map<String, String> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); if (streamConfigs.containsKey(StreamConfigProperties.SEGMENT_COMMIT_TIMEOUT_SECONDS)) { final String commitTimeoutSecondsStr = streamConfigs.get(StreamConfigProperties.SEGMENT_COMMIT_TIMEOUT_SECONDS); try { @@ -793,15 +782,49 @@ public class PinotLLCRealtimeSegmentManager { } } + @VisibleForTesting + Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState idealState) { + Set<Integer> partitionIds = new HashSet<>(); + boolean allPartitionIdsFetched = true; + for (int i = 0; i < streamConfigs.size(); i++) { + final int index = i; + try { + partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream() + .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) + .collect(Collectors.toSet())); + } catch (Exception e) { + allPartitionIdsFetched = false; + LOGGER.warn("Failed to fetch partition ids for stream: {}", streamConfigs.get(i).getTopicName(), e); + } + } + + // If it is failing to fetch partition ids from stream (usually transient due to stream metadata service outage), + // we need to use the existing partition information from ideal state to keep same ingestion behavior. + if (!allPartitionIdsFetched) { + LOGGER.info( + "Fetch partition ids from Stream incomplete, merge fetched partitionIds with partition group metadata " + + "for: {}", idealState.getId()); + // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed. + // We don't need to read partition group metadata for other partition groups. + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + partitionIds.addAll(newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId) + .collect(Collectors.toSet())); + } + return partitionIds; + } + /** * Fetches the latest state of the PartitionGroups for the stream * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, * it will be skipped from the result */ @VisibleForTesting - List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs, List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) { - return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, + return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); } @@ -918,7 +941,7 @@ public class PinotLLCRealtimeSegmentManager { * IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is ONLINE. * If so, it should create a new CONSUMING segment for the partition. */ - public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, + public void ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamConfig> streamConfigs, OffsetCriteria offsetCriteria) { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); @@ -932,15 +955,16 @@ public class PinotLLCRealtimeSegmentManager { List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = offsetsHaveToChange ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfig); - OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + // FIXME: Right now, we assume topics are sharing same offset criteria + OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); // Read the smallest offset when a new partition is detected - streamConfig.setOffsetCriteria( - offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(offsetsHaveToChange + ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); - streamConfig.setOffsetCriteria(originalOffsetCriteria); - return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); + return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, offsetCriteria); } else { LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", @@ -1160,8 +1184,8 @@ public class PinotLLCRealtimeSegmentManager { * TODO: split this method into multiple smaller methods */ @VisibleForTesting - IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, IdealState idealState, - List<PartitionGroupMetadata> partitionGroupMetadataList, OffsetCriteria offsetCriteria) { + IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamConfig> streamConfigs, + IdealState idealState, List<PartitionGroupMetadata> partitionGroupMetadataList, OffsetCriteria offsetCriteria) { String realtimeTableName = tableConfig.getTableName(); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); @@ -1175,7 +1199,7 @@ public class PinotLLCRealtimeSegmentManager { Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); StreamPartitionMsgOffsetFactory offsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); // Get the latest segment ZK metadata for each partition Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName); @@ -1240,7 +1264,7 @@ public class PinotLLCRealtimeSegmentManager { CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, + createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, segmentAssignment, instancePartitionsMap); @@ -1274,7 +1298,7 @@ public class PinotLLCRealtimeSegmentManager { // Smallest offset is fetched from stream once and cached in partitionIdToSmallestOffset. if (partitionIdToSmallestOffset == null) { - partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig); + partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs); } // Do not create new CONSUMING segment when the stream partition has reached end of life. @@ -1288,7 +1312,7 @@ public class PinotLLCRealtimeSegmentManager { selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning - createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, + createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs, partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, instancePartitionsMap, startOffset); } else { @@ -1297,7 +1321,7 @@ public class PinotLLCRealtimeSegmentManager { selectStartOffset(offsetCriteria, partitionId, partitionIdToStartOffset, partitionIdToSmallestOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset()); - createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, + createNewConsumingSegment(tableConfig, streamConfigs.get(0), latestSegmentZKMetadata, currentTimeMs, partitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, instancePartitionsMap, startOffset); } @@ -1344,7 +1368,8 @@ public class PinotLLCRealtimeSegmentManager { int partitionId = partitionGroupMetadata.getPartitionGroupId(); if (!latestSegmentZKMetadataMap.containsKey(partitionId)) { String newSegmentName = - setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, + setupNewPartitionGroup(tableConfig, streamConfigs.get(0), partitionGroupMetadata, currentTimeMs, + instancePartitions, numPartitions, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap); @@ -1372,15 +1397,18 @@ public class PinotLLCRealtimeSegmentManager { instancePartitionsMap); } - private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) { - OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); - streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); - List<PartitionGroupMetadata> partitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); - streamConfig.setOffsetCriteria(originalOffsetCriteria); + private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset( + List<StreamConfig> streamConfigs) { Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>(); - for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { - partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + for (StreamConfig streamConfig : streamConfigs) { + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + List<PartitionGroupMetadata> partitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList()); + streamConfig.setOffsetCriteria(originalOffsetCriteria); + for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { + partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + } } return partitionGroupIdToSmallestOffset; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 63d302f929..5bb3f861d7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -102,7 +102,7 @@ public class SegmentCompletionManager { String rawTableName = llcSegmentName.getTableName(); TableConfig tableConfig = _segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)); StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0)); return StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); } @@ -131,7 +131,7 @@ public class SegmentCompletionManager { TableConfig tableConfig = _segmentManager.getTableConfig(realtimeTableName); String factoryName = null; try { - Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); factoryName = streamConfigMap.get(StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME); } catch (Exception e) { // If there is an exception, we default to the default factory. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 88f1bc6ee6..dbe229ebc9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -104,14 +105,15 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType); return; } - StreamConfig streamConfig = - new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); if (context._runSegmentLevelValidation) { - runSegmentLevelValidation(tableConfig, streamConfig); + runSegmentLevelValidation(tableConfig); } if (shouldEnsureConsuming(tableNameWithType)) { - _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig, context._offsetCriteria); + _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfigs, context._offsetCriteria); } } @@ -147,7 +149,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea return !isQuotaExceeded; } - private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) { + private void runSegmentLevelValidation(TableConfig tableConfig) { String realtimeTableName = tableConfig.getTableName(); List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 42bc697c75..dbe640d364 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -91,8 +91,8 @@ import org.testng.annotations.Test; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS; import static org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; import static org.testng.Assert.*; @@ -274,7 +274,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // committing segment's partitionGroupId no longer in the newPartitionGroupMetadataList List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList()); + segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList()); partitionGroupMetadataListWithout0.remove(0); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout0; @@ -595,7 +595,7 @@ public class PinotLLCRealtimeSegmentManagerTest { */ // 1 reached end of shard. List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList()); + segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList()); partitionGroupMetadataListWithout1.remove(1); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout1; // noop @@ -882,7 +882,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // Expected } try { - segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, null); + segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfigs, null); fail(); } catch (IllegalStateException e) { // Expected @@ -1217,6 +1217,36 @@ public class PinotLLCRealtimeSegmentManagerTest { assertEquals(numDeletedTmpSegments, 1); } + @Test + public void testGetPartitionIds() + throws Exception { + List<StreamConfig> streamConfigs = List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs()); + IdealState idealState = new IdealState("table"); + FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(); + segmentManager._numPartitions = 2; + + // Test empty ideal state + Set<Integer> partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState); + Assert.assertEquals(partitionIds.size(), 2); + partitionIds.clear(); + + // Simulate the case where getPartitionIds(StreamConfig) throws an exception (e.g. transient kafka connection issue) + PinotLLCRealtimeSegmentManager segmentManagerSpy = spy(FakePinotLLCRealtimeSegmentManager.class); + doThrow(new RuntimeException()).when(segmentManagerSpy).getPartitionIds(any(StreamConfig.class)); + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList = + List.of(new PartitionGroupConsumptionStatus(0, 12, new LongMsgOffset(123), new LongMsgOffset(234), "ONLINE"), + new PartitionGroupConsumptionStatus(1, 12, new LongMsgOffset(123), new LongMsgOffset(345), "ONLINE")); + doReturn(partitionGroupConsumptionStatusList).when(segmentManagerSpy) + .getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + List<PartitionGroupMetadata> partitionGroupMetadataList = + List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)), + new PartitionGroupMetadata(1, new LongMsgOffset(345))); + doReturn(partitionGroupMetadataList).when(segmentManagerSpy) + .getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList); + partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState); + Assert.assertEquals(partitionIds.size(), 2); + } + ////////////////////////////////////////////////////////////////////////////////// // Fake classes ///////////////////////////////////////////////////////////////////////////////// @@ -1230,7 +1260,7 @@ public class PinotLLCRealtimeSegmentManagerTest { int _numReplicas; TableConfig _tableConfig; - StreamConfig _streamConfig; + List<StreamConfig> _streamConfigs; int _numInstances; InstancePartitions _consumingInstancePartitions; Map<String, SegmentZKMetadata> _segmentZKMetadataMap = new HashMap<>(); @@ -1258,8 +1288,8 @@ public class PinotLLCRealtimeSegmentManagerTest { _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas) .setStreamConfigs(streamConfigs).build(); - _streamConfig = - new StreamConfig(_tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + _streamConfigs = IngestionConfigUtils.getStreamConfigMaps(_tableConfig).stream().map( + streamConfig -> new StreamConfig(_tableConfig.getTableName(), streamConfig)).collect(Collectors.toList()); } void makeConsumingInstancePartitions() { @@ -1277,8 +1307,8 @@ public class PinotLLCRealtimeSegmentManagerTest { } public void ensureAllPartitionsConsuming() { - ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, - getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), null); + ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState, + getNewPartitionGroupMetadataList(_streamConfigs, Collections.emptyList()), null); } @Override @@ -1358,7 +1388,7 @@ public class PinotLLCRealtimeSegmentManagerTest { } @Override - List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs, List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) { if (_partitionGroupMetadataList != null) { return _partitionGroupMetadataList; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java index fff6232943..36caa5b86a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java @@ -73,7 +73,7 @@ public class DefaultTableDataManagerProvider implements TableDataManagerProvider } break; case REALTIME: - Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); if (Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE)) && StringUtils.isEmpty(_instanceDataManagerConfig.getSegmentStoreUri())) { throw new IllegalStateException(String.format("Table has enabled %s config. But the server has not " diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 684e1ffa53..380b358a84 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -282,7 +282,14 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private static final int MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS = 31; private Thread _consumerThread; + // _partitionGroupId represents the Pinot's internal partition number which will eventually be used as part of + // segment name. + // _streamPatitionGroupId represents the partition number in the stream topic, which could be derived from the + // _partitionGroupId and identify which partition of the stream topic this consumer is consuming from. + // Note that in traditional single topic ingestion mode, those two concepts were identical which got separated + // in multi-topic ingestion mode. private final int _partitionGroupId; + private final int _streamPatitionGroupId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; final String _clientId; private final TransformPipeline _transformPipeline; @@ -1496,12 +1503,16 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); // TODO Validate configs IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); - _streamConfig = new StreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + _partitionGroupId = llcSegmentName.getPartitionGroupId(); + _streamPatitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId); + _streamConfig = new StreamConfig( + _tableNameWithType, + IngestionConfigUtils.getStreamConfigMaps(_tableConfig) + .get(IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId))); _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); _streamPartitionMsgOffsetFactory = _streamConsumerFactory.createStreamMsgOffsetFactory(); String streamTopic = _streamConfig.getTopicName(); _segmentNameStr = _segmentZKMetadata.getSegmentName(); - _partitionGroupId = llcSegmentName.getPartitionGroupId(); _partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(_partitionGroupId, llcSegmentName.getSequenceNumber(), _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset()), @@ -1514,9 +1525,9 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { String clientIdSuffix = instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null; if (StringUtils.isNotBlank(clientIdSuffix)) { - _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId + "-" + clientIdSuffix; + _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId + "-" + clientIdSuffix; } else { - _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId; + _clientId = _tableNameWithType + "-" + streamTopic + "-" + _streamPatitionGroupId; } _segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); _tableStreamName = _tableNameWithType + "_" + streamTopic; @@ -1832,7 +1843,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private void createPartitionMetadataProvider(String reason) { closePartitionMetadataProvider(); _segmentLogger.info("Creating new partition metadata provider, reason: {}", reason); - _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId); + _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider( + _clientId, _streamPatitionGroupId); } private void updateIngestionMetrics(RowMetadata metadata) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index 33a3b55654..4224019ab0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -47,7 +47,7 @@ public class SegmentCommitterFactory { _protocolHandler = protocolHandler; _tableConfig = tableConfig; _streamConfig = new StreamConfig(_tableConfig.getTableName(), - IngestionConfigUtils.getStreamConfigMap(_tableConfig)); + IngestionConfigUtils.getStreamConfigMaps(_tableConfig).get(0)); _indexLoadingConfig = indexLoadingConfig; _serverMetrics = serverMetrics; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 387f69a442..141e0c280a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -169,15 +169,22 @@ public final class TableConfigUtils { // Only allow realtime tables with non-null stream.type and LLC consumer.type if (tableConfig.getTableType() == TableType.REALTIME) { - Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); + if (streamConfigMaps.size() > 1) { + Preconditions.checkArgument(!tableConfig.isUpsertEnabled(), + "Multiple stream configs are not supported for upsert tables"); + } + // TODO: validate stream configs in the map are identical in most fields StreamConfig streamConfig; - try { - // Validate that StreamConfig can be created - streamConfig = new StreamConfig(tableConfig.getTableName(), streamConfigMap); - } catch (Exception e) { - throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e); + for (Map<String, String> streamConfigMap : streamConfigMaps) { + try { + // Validate that StreamConfig can be created + streamConfig = new StreamConfig(tableConfig.getTableName(), streamConfigMap); + } catch (Exception e) { + throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e); + } + validateStreamConfig(streamConfig); } - validateStreamConfig(streamConfig); } validateTierConfigList(tableConfig.getTierConfigsList()); validateIndexingConfig(tableConfig.getIndexingConfig(), schema); @@ -390,7 +397,8 @@ public final class TableConfigUtils { Preconditions.checkState(indexingConfig == null || MapUtils.isEmpty(indexingConfig.getStreamConfigs()), "Should not use indexingConfig#getStreamConfigs if ingestionConfig#StreamIngestionConfig is provided"); List<Map<String, String>> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps(); - Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream is supported in REALTIME table"); + Preconditions.checkState(streamConfigMaps.size() > 0, "Must have at least 1 stream in REALTIME table"); + // TODO: for multiple stream configs, validate them } // Filter config diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 98b0ba552c..72a17ee7d1 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -684,12 +684,11 @@ public class TableConfigUtilsTest { new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") .setIngestionConfig(ingestionConfig).build(); - // only 1 stream config allowed + // Multiple stream configs are allowed try { TableConfigUtils.validateIngestionConfig(tableConfig, null); - Assert.fail("Should fail for more than 1 stream config"); } catch (IllegalStateException e) { - // expected + Assert.fail("Multiple stream configs should be supported"); } // stream config should be valid diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java index d519a23029..bc02df8462 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.spi.stream; +import org.apache.pinot.spi.utils.IngestionConfigUtils; + + /** * A PartitionGroup is a group of partitions/shards that the same consumer should consume from. * This class contains all information which describes the latest state of a partition group. @@ -36,6 +39,7 @@ package org.apache.pinot.spi.stream; public class PartitionGroupConsumptionStatus { private final int _partitionGroupId; + private final int _streamPartitionGroupId; private int _sequenceNumber; private StreamPartitionMsgOffset _startOffset; private StreamPartitionMsgOffset _endOffset; @@ -44,6 +48,7 @@ public class PartitionGroupConsumptionStatus { public PartitionGroupConsumptionStatus(int partitionGroupId, int sequenceNumber, StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, String status) { _partitionGroupId = partitionGroupId; + _streamPartitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); _sequenceNumber = sequenceNumber; _startOffset = startOffset; _endOffset = endOffset; @@ -54,6 +59,10 @@ public class PartitionGroupConsumptionStatus { return _partitionGroupId; } + public int getStreamPartitionGroupId() { + return _streamPartitionGroupId; + } + public int getSequenceNumber() { return _sequenceNumber; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 98094b9e88..158e28ce72 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -18,33 +18,35 @@ */ package org.apache.pinot.spi.stream; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; +import java.util.stream.Collectors; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the stream, + * Fetches the list of {@link PartitionGroupMetadata} for all partition groups of the streams, * using the {@link StreamMetadataProvider} */ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); - private List<PartitionGroupMetadata> _newPartitionGroupMetadataList; - private final StreamConfig _streamConfig; + private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList; + private final List<StreamConfig> _streamConfigs; private final List<PartitionGroupConsumptionStatus> _partitionGroupConsumptionStatusList; - private final StreamConsumerFactory _streamConsumerFactory; private Exception _exception; - private final String _topicName; + private final List<String> _topicNames; - public PartitionGroupMetadataFetcher(StreamConfig streamConfig, + public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs, List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { - _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - _topicName = streamConfig.getTopicName(); - _streamConfig = streamConfig; + _topicNames = streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList()); + _streamConfigs = streamConfigs; _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; + _newPartitionGroupMetadataList = new ArrayList<>(); } public List<PartitionGroupMetadata> getPartitionGroupMetadataList() { @@ -63,25 +65,43 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { @Override public Boolean call() throws Exception { - String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" - + _streamConfig.getTableNameWithType() + "-" + _topicName; - try ( - StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) { - _newPartitionGroupMetadataList = streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfig, - _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000); - if (_exception != null) { - // We had at least one failure, but succeeded now. Log an info - LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic {}", _topicName); + _newPartitionGroupMetadataList.clear(); + for (int i = 0; i < _streamConfigs.size(); i++) { + String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + + _streamConfigs.get(i).getTableNameWithType() + "-" + _topicNames.get(i); + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfigs.get(i)); + final int index = i; + List<PartitionGroupConsumptionStatus> topicPartitionGroupConsumptionStatusList = + _partitionGroupConsumptionStatusList.stream() + .filter(partitionGroupConsumptionStatus -> + IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId( + partitionGroupConsumptionStatus.getPartitionGroupId()) == index) + .collect(Collectors.toList()); + try ( + StreamMetadataProvider streamMetadataProvider = + streamConsumerFactory.createStreamMetadataProvider(clientId)) { + _newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId, + _streamConfigs.get(i), + topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000).stream().map( + metadata -> new PartitionGroupMetadata( + IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId( + metadata.getPartitionGroupId(), index), + metadata.getStartOffset())).collect(Collectors.toList()) + ); + if (_exception != null) { + // We had at least one failure, but succeeded now. Log an info + LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic {}", _topicNames.get(i)); + } + } catch (TransientConsumerException e) { + LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicNames.get(i), e); + _exception = e; + return Boolean.FALSE; + } catch (Exception e) { + LOGGER.warn("Could not get partition count for topic {}", _topicNames.get(i), e); + _exception = e; + throw e; } - return Boolean.TRUE; - } catch (TransientConsumerException e) { - LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicName, e); - _exception = e; - return Boolean.FALSE; - } catch (Exception e) { - LOGGER.warn("Could not get partition count for topic {}", _topicName, e); - _exception = e; - throw e; } + return Boolean.TRUE; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index 39d061473e..e52610dd67 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -223,7 +223,7 @@ public class StreamConfig { return _serverUploadToDeepStore; } - private double extractFlushThresholdVarianceFraction(Map<String, String> streamConfigMap) { + public static double extractFlushThresholdVarianceFraction(Map<String, String> streamConfigMap) { String key = StreamConfigProperties.FLUSH_THRESHOLD_VARIANCE_FRACTION; String flushThresholdVarianceFractionStr = streamConfigMap.get(key); if (flushThresholdVarianceFractionStr != null) { @@ -245,7 +245,7 @@ public class StreamConfig { } } - private long extractFlushThresholdSegmentSize(Map<String, String> streamConfigMap) { + public static long extractFlushThresholdSegmentSize(Map<String, String> streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE; String flushThresholdSegmentSizeStr = streamConfigMap.get(key); if (flushThresholdSegmentSizeStr == null) { @@ -264,7 +264,7 @@ public class StreamConfig { } } - protected int extractFlushThresholdRows(Map<String, String> streamConfigMap) { + public static int extractFlushThresholdRows(Map<String, String> streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS; String flushThresholdRowsStr = streamConfigMap.get(key); if (flushThresholdRowsStr == null) { @@ -288,7 +288,7 @@ public class StreamConfig { } } - protected int extractFlushThresholdSegmentRows(Map<String, String> streamConfigMap) { + public static int extractFlushThresholdSegmentRows(Map<String, String> streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_ROWS; String flushThresholdSegmentRowsStr = streamConfigMap.get(key); if (flushThresholdSegmentRowsStr != null) { @@ -302,7 +302,7 @@ public class StreamConfig { } } - protected long extractFlushThresholdTimeMillis(Map<String, String> streamConfigMap) { + public static long extractFlushThresholdTimeMillis(Map<String, String> streamConfigMap) { String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME; String flushThresholdTimeStr = streamConfigMap.get(key); if (flushThresholdTimeStr == null) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index 812b7b8e0f..a8c4d22cc3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -59,7 +59,7 @@ public abstract class StreamConsumerFactory { */ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) { - return createPartitionLevelConsumer(clientId, partitionGroupConsumptionStatus.getPartitionGroupId()); + return createPartitionLevelConsumer(clientId, partitionGroupConsumptionStatus.getStreamPartitionGroupId()); } @Deprecated diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 85bb2801a1..052993a6d0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -81,7 +81,7 @@ public interface StreamMetadataProvider extends Closeable { // If partition group is still in progress, this value will be null for (PartitionGroupConsumptionStatus currentPartitionGroupConsumptionStatus : partitionGroupConsumptionStatuses) { newPartitionGroupMetadataList.add( - new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getPartitionGroupId(), + new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getStreamPartitionGroupId(), currentPartitionGroupConsumptionStatus.getEndOffset())); } // Add PartitionGroupMetadata for new partitions diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 2aeba4160b..81e2d9655a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.utils; import com.google.common.base.Preconditions; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,6 +30,7 @@ import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.stream.StreamConfig; /** @@ -46,15 +48,100 @@ public final class IngestionConfigUtils { private static final int DEFAULT_PUSH_ATTEMPTS = 5; private static final int DEFAULT_PUSH_PARALLELISM = 1; private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L; + // For partition from different topics, we pad then with an offset to avoid collision. The offset is far higher + // than the normal max number of partitions on stream (e.g. 512). + public static final int PARTITION_PADDING_OFFSET = 10000; + public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING = + "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory"; + public static final String STREAM_TYPE = "streamType"; + public static final String STREAM_CONSUMER_FACTORY_CLASS = "stream.consumer.factory.class"; /** * Fetches the streamConfig from the given realtime table. * First, the ingestionConfigs->stream->streamConfigs will be checked. * If not found, the indexingConfig->streamConfigs will be checked (which is deprecated). * @param tableConfig realtime table config - * @return streamConfigs map + * @return streamConfigs List of maps */ - public static Map<String, String> getStreamConfigMap(TableConfig tableConfig) { + public static List<Map<String, String>> getStreamConfigMaps(TableConfig tableConfig) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { + List<Map<String, String>> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + Preconditions.checkState(!streamConfigMaps.isEmpty(), "Table must have at least 1 stream"); + /* + Apply the following checks if there are multiple streamConfigs + 1. Check if all streamConfigs have the same stream type. TODO: remove this limitation once we've tested it + 2. Ensure segment flush parameters consistent across all streamConfigs. We need this because Pinot is predefining + the values before fetching stream partition info from stream. At the construction time, we don't know the value + extracted from a streamConfig would be applied to which segment. + TODO: remove this limitation once we've refactored the code and supported it. + */ + Map<String, String> firstStreamConfigMap = streamConfigMaps.get(0); + for (int i = 1; i < streamConfigMaps.size(); i++) { + Map<String, String> map = streamConfigMaps.get(i); + Preconditions.checkNotNull(map.get(STREAM_TYPE), + "streamType must be defined for all streamConfigs for REALTIME table: %s", tableNameWithType); + Preconditions.checkState(StringUtils.equals(map.get(STREAM_TYPE), firstStreamConfigMap.get(STREAM_TYPE)) + && StreamConfig.extractFlushThresholdRows(map) == StreamConfig.extractFlushThresholdRows( + firstStreamConfigMap) + && StreamConfig.extractFlushThresholdTimeMillis(map) == StreamConfig.extractFlushThresholdTimeMillis( + firstStreamConfigMap) + && StreamConfig.extractFlushThresholdVarianceFraction(map) + == StreamConfig.extractFlushThresholdVarianceFraction(firstStreamConfigMap) + && StreamConfig.extractFlushThresholdSegmentSize(map) == StreamConfig.extractFlushThresholdSegmentSize( + firstStreamConfigMap) + && StreamConfig.extractFlushThresholdSegmentRows(map) == StreamConfig.extractFlushThresholdSegmentRows( + firstStreamConfigMap), + "All streamConfigs must have the same stream type for REALTIME table: %s", tableNameWithType); + } + return streamConfigMaps; + } + if (tableConfig.getIndexingConfig() != null && tableConfig.getIndexingConfig().getStreamConfigs() != null) { + return Arrays.asList(tableConfig.getIndexingConfig().getStreamConfigs()); + } + throw new IllegalStateException("Could not find streamConfigs for REALTIME table: " + tableNameWithType); + } + + /** + * Getting the Pinot segment level partition id from the stream partition id. + * @param partitionId the partition group id from the stream + * @param index the index of the SteamConfig from the list of StreamConfigs + * @return + */ + public static int getPinotPartitionIdFromStreamPartitionId(int partitionId, int index) { + return index * PARTITION_PADDING_OFFSET + partitionId; + } + + /** + * Getting the Stream partition id from the Pinot segment partition id. + * @param partitionId the segment partition group id on Pinot + * @return + */ + public static int getStreamPartitionIdFromPinotPartitionId(int partitionId) { + return partitionId % PARTITION_PADDING_OFFSET; + } + + /** + * Getting the StreamConfig index of StreamConfigs list from the Pinot segment partition id. + * @param partitionId the segment partition group id on Pinot + * @return + */ + public static int getStreamConfigIndexFromPinotPartitionId(int partitionId) { + return partitionId / PARTITION_PADDING_OFFSET; + } + + /** + * Fetches the streamConfig from the list of streamConfigs according to the partitonGroupId. + * @param tableConfig realtime table config + * @param partitionGroupId partitionGroupId + * @return streamConfig map + */ + public static Map<String, String> getStreamConfigMapWithPartitionGroupId( + TableConfig tableConfig, int partitionGroupId) { String tableNameWithType = tableConfig.getTableName(); Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType); @@ -63,10 +150,13 @@ public final class IngestionConfigUtils { && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { List<Map<String, String>> streamConfigMaps = tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); - Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream supported per table"); - streamConfigMap = streamConfigMaps.get(0); + Preconditions.checkState( + streamConfigMaps.size() > partitionGroupId / PARTITION_PADDING_OFFSET, + "Table does not have enough number of stream"); + streamConfigMap = streamConfigMaps.get(partitionGroupId / PARTITION_PADDING_OFFSET); } - if (streamConfigMap == null && tableConfig.getIndexingConfig() != null) { + if (partitionGroupId < PARTITION_PADDING_OFFSET + && streamConfigMap == null && tableConfig.getIndexingConfig() != null) { streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs(); } if (streamConfigMap == null) { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java index b2b4c87b29..1e9517a330 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; @@ -44,7 +45,9 @@ public class IngestionConfigUtilsTest { public void testGetStreamConfigMap() { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build(); try { - IngestionConfigUtils.getStreamConfigMap(tableConfig); + IngestionConfigUtils.getStreamConfigMaps(tableConfig); + Assert.fail("Should fail for OFFLINE table"); + IngestionConfigUtils.getStreamConfigMaps(tableConfig); Assert.fail("Should fail for OFFLINE table"); } catch (IllegalStateException e) { // expected @@ -58,7 +61,7 @@ public class IngestionConfigUtilsTest { IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigMap))); tableConfig.setIngestionConfig(ingestionConfig); - Map<String, String> actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + Map<String, String> actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Assert.assertEquals(actualStreamConfigsMap.size(), 1); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka"); @@ -69,30 +72,30 @@ public class IngestionConfigUtilsTest { IndexingConfig indexingConfig = new IndexingConfig(); indexingConfig.setStreamConfigs(deprecatedStreamConfigMap); tableConfig.setIndexingConfig(indexingConfig); - actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Assert.assertEquals(actualStreamConfigsMap.size(), 1); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka"); - // fail if multiple found + // Able to get multiple stream configs ingestionConfig.setStreamIngestionConfig( new StreamIngestionConfig(Arrays.asList(streamConfigMap, deprecatedStreamConfigMap))); try { - IngestionConfigUtils.getStreamConfigMap(tableConfig); - Assert.fail("Should fail for multiple stream configs"); + List<Map<String, String>> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig); + Assert.assertEquals(streamConfigs.size(), 2); } catch (IllegalStateException e) { // expected } // get from indexing config tableConfig.setIngestionConfig(null); - actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Assert.assertEquals(actualStreamConfigsMap.size(), 2); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "foo"); // fail if found nowhere tableConfig.setIndexingConfig(new IndexingConfig()); try { - IngestionConfigUtils.getStreamConfigMap(tableConfig); + IngestionConfigUtils.getStreamConfigMaps(tableConfig); Assert.fail("Should fail for no stream config found"); } catch (IllegalStateException e) { // expected --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org