This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a7fba5a7ffc843ea576d23e330cd2fd8441ee5fb Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Thu Dec 31 14:46:46 2020 -0800 Separate PartitionGroupInfo and PartitionGroupMetadata --- .../helix/core/PinotHelixResourceManager.java | 12 +-- .../realtime/PinotLLCRealtimeSegmentManager.java | 108 ++++++++------------- .../impl/fakestream/FakeStreamConsumerFactory.java | 3 +- .../kafka09/KafkaPartitionLevelConsumerTest.java | 2 +- .../kafka20/KafkaStreamMetadataProvider.java | 28 ++++-- .../pinot/spi/stream/PartitionGroupInfo.java | 43 ++++++++ .../pinot/spi/stream/StreamMetadataProvider.java | 6 +- 7 files changed, 115 insertions(+), 87 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 1f36e4f..f0d52bc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -126,12 +126,7 @@ import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; -import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.stream.StreamConsumerFactory; -import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; -import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -1361,9 +1356,8 @@ public class PinotHelixResourceManager { idealState = PinotTableIdealStateBuilder .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); - LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); - _pinotLLCRealtimeSegmentManager.setupNewShardedTable(rawRealtimeTableConfig, idealState); + _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState); + LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName); } else { if (streamConfig.hasHighLevelConsumerType()) { @@ -1391,7 +1385,7 @@ public class PinotHelixResourceManager { idealState = PinotTableIdealStateBuilder .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); + _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState); LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); } else { LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 9b03fa4..528125b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -78,6 +79,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.stream.PartitionOffsetFetcher; @@ -221,8 +223,14 @@ public class PinotLLCRealtimeSegmentManager { StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); StreamMetadataProvider streamMetadataProvider = streamConsumerFactory .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis()); - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000); + + List<PartitionGroupInfo> newPartitionGroupMetadataList; + try { + newPartitionGroupMetadataList = + streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000); + } catch (TimeoutException e) { + throw new IllegalStateException(e); + } int numPartitionGroups = newPartitionGroupMetadataList.size(); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); @@ -234,8 +242,8 @@ public class PinotLLCRealtimeSegmentManager { long currentTimeMs = getCurrentTimeMs(); Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); - for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { - String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata.getPartitionGroupId(), + for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) { + String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, currentTimeMs, instancePartitions, numPartitionGroups, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, instancePartitionsMap); @@ -277,50 +285,6 @@ public class PinotLLCRealtimeSegmentManager { } /** - * Sets up the initial segments for a new LLC real-time table. - * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC and LLC are configured. - */ - public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { - Preconditions.checkState(!_isStopping, "Segment manager is stopping"); - - String realtimeTableName = tableConfig.getTableName(); - LOGGER.info("Setting up new LLC table: {}", realtimeTableName); - - // Make sure all the existing segments are HLC segments - List<String> currentSegments = getAllSegments(realtimeTableName); - for (String segmentName : currentSegments) { - // TODO: Should return 4xx HTTP status code. Currently all exceptions are returning 500 - Preconditions.checkState(SegmentName.isHighLevelConsumerSegmentName(segmentName), - "Cannot set up new LLC table: %s with existing non-HLC segment: %s", realtimeTableName, segmentName); - } - - _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName); - - PartitionLevelStreamConfig streamConfig = - new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); - InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); - List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); - int numPartitionGroups = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size(); - int numReplicas = getNumReplicas(tableConfig, instancePartitions); - - SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); - Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = - Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); - - long currentTimeMs = getCurrentTimeMs(); - Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); - for (int partitionGroupId = 0; partitionGroupId < numPartitionGroups; partitionGroupId++) { - String segmentName = - setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupId, currentTimeMs, instancePartitions, numPartitionGroups, - numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, - instancePartitionsMap); - } - - setIdealState(realtimeTableName, idealState); - } - - /** * Removes all LLC segments from the given IdealState. */ public void removeLLCSegments(IdealState idealState) { @@ -538,15 +502,23 @@ public class PinotLLCRealtimeSegmentManager { // Step-2 // Say we currently were consuming from 3 shards A, B, C. Of those, A is the one committing. Also suppose that new partition D has come up + // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS] List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); - StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); StreamMetadataProvider streamMetadataProvider = streamConsumerFactory .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis()); + // find new partition groups [A],[B],[C],[D] - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000); + List<PartitionGroupInfo> newPartitionGroupMetadataList; + try { + newPartitionGroupMetadataList = + streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000); + } catch (TimeoutException e) { + throw new IllegalStateException(e); + } // create new segment metadata, only if it is not IN_PROGRESS in the current state Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect( @@ -555,25 +527,24 @@ public class PinotLLCRealtimeSegmentManager { List<String> newConsumingSegmentNames = new ArrayList<>(); String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); - for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { - int newPartitionGroupId = partitionGroupMetadata.getPartitionGroupId(); + for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) { + int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId(); PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId); - if (currentPartitionGroupMetadata == null) { // not present in current state + if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found. // make new segment - LLCSegmentName newLLCSegmentName = - new LLCSegmentName(rawTableName, newPartitionGroupId, STARTING_SEQUENCE_NUMBER, newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(), - IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs, - committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); - newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName()); + String newLLCSegmentName = + setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs, + instancePartitions, numPartitions, numReplicas); + newConsumingSegmentNames.add(newLLCSegmentName); } else { String currentStatus = currentPartitionGroupMetadata.getStatus(); - if (!currentStatus.equals(Status.IN_PROGRESS.toString())) { // not IN_PROGRESS anymore in current state - // make new segment + if (!currentStatus.equals(Status.IN_PROGRESS.toString())) { + // not IN_PROGRESS anymore in current state. Should be DONE. + // This should ONLY happen for the committing segment's partition. Need to trigger new consuming segment + // todo: skip this if the partition doesn't match with the committing segment? LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId, currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(), - IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs, + createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName()); } @@ -1181,19 +1152,20 @@ public class PinotLLCRealtimeSegmentManager { * Sets up a new partition. * <p>Persists the ZK metadata for the first CONSUMING segment, and returns the segment name. */ - private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionGroupId, + private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, PartitionGroupInfo partitionGroupInfo, long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) { String realtimeTableName = tableConfig.getTableName(); + int partitionGroupId = partitionGroupInfo.getPartitionGroupId(); + String startCheckpoint = partitionGroupInfo.getStartCheckpoint(); LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName); String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs); String newSegmentName = newLLCSegmentName.getSegmentName(); - StreamPartitionMsgOffset startOffset = - getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionGroupId); + CommittingSegmentDescriptor committingSegmentDescriptor = - new CommittingSegmentDescriptor(null, startOffset.toString(), 0); + new CommittingSegmentDescriptor(null, startCheckpoint, 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs, committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java index 289b226..54be1b6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.realtime.impl.fakestream; +import java.util.Collections; import java.util.Set; import org.apache.pinot.core.util.IngestionUtils; import org.apache.pinot.spi.config.table.TableConfig; @@ -87,7 +88,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { // stream metadata provider StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId); - int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(null, 10_000).size(); + int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10_000).size(); System.out.println(partitionCount); // Partition metadata provider diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java index fbdfdfb..43b72a8 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java @@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest { KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory); - Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(null, 10000L), 2); + Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10000L), 2); } @Test diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index 187c61b..eb606f2 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -29,6 +29,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; @@ -37,12 +38,15 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler implements StreamMetadataProvider { + private StreamConfig _streamConfig; + public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) { this(clientId, streamConfig, Integer.MIN_VALUE); } public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) { super(clientId, streamConfig, partition); + _streamConfig = streamConfig; } @Override @@ -57,14 +61,26 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa * Hence current partition groups are not needed to compute the new partition groups */ @Override - public List<PartitionGroupMetadata> getPartitionGroupMetadataList( - @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) { + public List<PartitionGroupInfo> getPartitionGroupMetadataList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) + throws TimeoutException { int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); - List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount); - for (int i = 0; i < partitionCount; i++) { - partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i)); + List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount); + + // add a PartitionGroupInfo into the list foreach partition already present in current. + // the end checkpoint is set as checkpoint + for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) { + newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), + currentPartitionGroupMetadata.getEndCheckpoint())); + } + // add PartitiongroupInfo for new partitions + // use offset criteria from stream config + for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) { + StreamPartitionMsgOffset streamPartitionMsgOffset = + fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000); + newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString())); } - return partitionGroupMetadataList; + return newPartitionGroupInfoList; } public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java new file mode 100644 index 0000000..438e148 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +public class PartitionGroupInfo { + + // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider) + private final int _partitionGroupId; + private String _startCheckpoint; + + public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) { + _partitionGroupId = partitionGroupId; + _startCheckpoint = startCheckpoint; + } + + public void setStartCheckpoint(String startCheckpoint) { + _startCheckpoint = startCheckpoint; + } + + public int getPartitionGroupId() { + return _partitionGroupId; + } + + public String getStartCheckpoint() { + return _startCheckpoint; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 5b9104e..a9cd2d6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream; import java.io.Closeable; import java.util.List; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; @@ -39,8 +40,9 @@ public interface StreamMetadataProvider extends Closeable { int fetchPartitionCount(long timeoutMillis); // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state - List<PartitionGroupMetadata> getPartitionGroupMetadataList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis); + List<PartitionGroupInfo> getPartitionGroupMetadataList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) + throws TimeoutException; // Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0 @Deprecated --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org