This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 396dae01f0804bc916bfba1bf2d0c52a374b767e Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Sat Jan 2 18:16:09 2021 -0800 default methods to avoid interface changes --- .../realtime/LLRealtimeSegmentDataManager.java | 3 +- .../impl/fakestream/FakeStreamConsumerFactory.java | 8 +--- .../fakestream/FakeStreamMetadataProvider.java | 11 ----- ...lakyConsumerRealtimeClusterIntegrationTest.java | 5 --- .../stream/kafka09/KafkaConsumerFactory.java | 7 --- .../kafka09/KafkaStreamMetadataProvider.java | 46 +------------------ .../kafka09/KafkaPartitionLevelConsumerTest.java | 2 +- .../stream/kafka20/KafkaConsumerFactory.java | 7 --- .../kafka20/KafkaStreamMetadataProvider.java | 52 ---------------------- .../spi/stream/PartitionGroupInfoFetcher.java | 4 +- .../pinot/spi/stream/StreamConsumerFactory.java | 4 +- .../pinot/spi/stream/StreamMetadataProvider.java | 36 ++++++++++++--- 12 files changed, 43 insertions(+), 142 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 80aa9d8..758c656 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1246,7 +1246,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { int numPartitions = columnPartitionConfig.getNumPartitions(); try { // fixme: get this from ideal state - int numStreamPartitions = _streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 5000).size(); + int numStreamPartitions = _streamMetadataProvider + .getPartitionGroupInfoList(_clientId, _partitionLevelStreamConfig, Collections.emptyList(), 5000).size(); if (numStreamPartitions != numPartitions) { segmentLogger.warn( "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions", diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java index fbeb808..b0dc7eb 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java @@ -69,12 +69,6 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { return new FakeStreamMetadataProvider(_streamConfig); } - - @Override - public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { - return null; - } - public static void main(String[] args) throws Exception { String clientId = "client_id_localhost_tester"; @@ -88,7 +82,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { // stream metadata provider StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId); - int partitionCount = streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10_000).size(); + int partitionCount = streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10_000).size(); System.out.println(partitionCount); // Partition metadata provider diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java index 0de0ce2..61aa01f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java @@ -48,17 +48,6 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider { return _numPartitions; } - @Override - public List<PartitionGroupInfo> getPartitionGroupInfoList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) - throws TimeoutException { - List<PartitionGroupInfo> partitionGroupMetadataList = new ArrayList<>(); - for (int i = 0; i < _numPartitions; i++) { - partitionGroupMetadataList.add(new PartitionGroupInfo(i, fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000).toString())); - } - return partitionGroupMetadataList; - } - public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException { throw new UnsupportedOperationException("This method is deprecated"); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java index c7523e3..4503de0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java @@ -119,10 +119,5 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster public StreamMetadataProvider createStreamMetadataProvider(String clientId) { throw new UnsupportedOperationException(); } - - @Override - public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { - return null; - } } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java index fe5a461..615e354 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java @@ -19,8 +19,6 @@ package org.apache.pinot.plugin.stream.kafka09; import java.util.Set; -import org.apache.pinot.spi.stream.PartitionGroupConsumer; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamLevelConsumer; @@ -52,9 +50,4 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { public StreamMetadataProvider createStreamMetadataProvider(String clientId) { return new KafkaStreamMetadataProvider(clientId, _streamConfig); } - - @Override - public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { - return null; - } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java index 2d0ad31..06ee697 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java @@ -22,13 +22,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetRequest; @@ -40,8 +36,6 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.Errors; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; -import org.apache.pinot.spi.stream.PartitionGroupInfo; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -55,14 +49,13 @@ import org.slf4j.LoggerFactory; public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamMetadataProvider.class); - private StreamConfig _streamConfig; - /** * Create a partition specific metadata provider + * @param streamConfig + * @param partition */ public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) { super(clientId, streamConfig, partition, new KafkaSimpleConsumerFactoryImpl()); - _streamConfig = streamConfig; } /** @@ -71,21 +64,18 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen */ public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) { super(clientId, streamConfig, new KafkaSimpleConsumerFactoryImpl()); - _streamConfig = streamConfig; } @VisibleForTesting public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition, KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) { super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory); - _streamConfig = streamConfig; } @VisibleForTesting public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) { super(clientId, streamConfig, kafkaSimpleConsumerFactory); - _streamConfig = streamConfig; } /** @@ -94,12 +84,7 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen * @return */ @Override - @Deprecated public synchronized int fetchPartitionCount(long timeoutMillis) { - return fetchPartitionCountInternal(timeoutMillis); - } - - private int fetchPartitionCountInternal(long timeoutMillis) { int unknownTopicReplyCount = 0; final int MAX_UNKNOWN_TOPIC_REPLY_COUNT = 10; int kafkaErrorCount = 0; @@ -160,33 +145,6 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen throw new TimeoutException(); } - /** - * Fetch the partitionGroupMetadata list. - * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition. - */ - @Override - public List<PartitionGroupInfo> getPartitionGroupInfoList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) - throws java.util.concurrent.TimeoutException { - int partitionCount = fetchPartitionCountInternal(timeoutMillis); - List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount); - - // add a PartitionGroupInfo into the list foreach partition already present in current. - // the end checkpoint is set as checkpoint - for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) { - newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), - currentPartitionGroupMetadata.getEndCheckpoint())); - } - // add PartitiongroupInfo for new partitions - // use offset criteria from stream config - for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) { - StreamPartitionMsgOffset streamPartitionMsgOffset = - fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000); - newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString())); - } - return newPartitionGroupInfoList; - } - public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws java.util.concurrent.TimeoutException { throw new UnsupportedOperationException("The use of this method s not supported"); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java index 9d3091e..90dc5ad 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java @@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest { KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory); - Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10000L), 2); + Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10000), 2); } @Test diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java index b6746ff..e0d1015 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java @@ -19,8 +19,6 @@ package org.apache.pinot.plugin.stream.kafka20; import java.util.Set; -import org.apache.pinot.spi.stream.PartitionGroupConsumer; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamLevelConsumer; @@ -49,9 +47,4 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { public StreamMetadataProvider createStreamMetadataProvider(String clientId) { return new KafkaStreamMetadataProvider(clientId, _streamConfig); } - - @Override - public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { - return new KafkaPartitionLevelConsumer(clientId, _streamConfig, metadata.getPartitionGroupId()); - } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index 1d3162a..38c49f5 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -21,17 +21,11 @@ package org.apache.pinot.plugin.stream.kafka20; import com.google.common.base.Preconditions; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.apache.kafka.common.TopicPartition; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; -import org.apache.pinot.spi.stream.PartitionGroupInfo; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -39,15 +33,12 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler implements StreamMetadataProvider { - private StreamConfig _streamConfig; - public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) { this(clientId, streamConfig, Integer.MIN_VALUE); } public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) { super(clientId, streamConfig, partition); - _streamConfig = streamConfig; } @Override @@ -56,33 +47,6 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); } - /** - * Fetch the partitionGroupMetadata list. - * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition. - */ - @Override - public List<PartitionGroupInfo> getPartitionGroupInfoList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) - throws TimeoutException { - int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); - List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount); - - // add a PartitionGroupInfo into the list foreach partition already present in current. - // the end checkpoint is set as checkpoint - for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) { - newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), - currentPartitionGroupMetadata.getEndCheckpoint())); - } - // add PartitiongroupInfo for new partitions - // use offset criteria from stream config - for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) { - StreamPartitionMsgOffset streamPartitionMsgOffset = - fetchStreamPartitionOffsetInternal(i, _streamConfig.getOffsetCriteria(), 5000); - newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString())); - } - return newPartitionGroupInfoList; - } - public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws java.util.concurrent.TimeoutException { throw new UnsupportedOperationException("The use of this method is not supported"); @@ -105,22 +69,6 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa return new LongMsgOffset(offset); } - private StreamPartitionMsgOffset fetchStreamPartitionOffsetInternal(int partitionId, @Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) { - Preconditions.checkNotNull(offsetCriteria); - TopicPartition topicPartition = new TopicPartition(_topic, partitionId); - long offset = -1; - if (offsetCriteria.isLargest()) { - offset = _consumer.endOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis)) - .get(topicPartition); - } else if (offsetCriteria.isSmallest()) { - offset = _consumer.beginningOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis)) - .get(topicPartition); - } else { - throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString()); - } - return new LongMsgOffset(offset); - } - @Override public void close() throws IOException { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java index d13be10..f2d3f17 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java @@ -32,6 +32,7 @@ public class PartitionGroupInfoFetcher implements Callable<Boolean> { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupInfoFetcher.class); private List<PartitionGroupInfo> _partitionGroupInfoList; + private final StreamConfig _streamConfig; private final List<PartitionGroupMetadata> _currentPartitionGroupMetadata; private final StreamConsumerFactory _streamConsumerFactory; private Exception _exception; @@ -40,6 +41,7 @@ public class PartitionGroupInfoFetcher implements Callable<Boolean> { public PartitionGroupInfoFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); _topicName = streamConfig.getTopicName(); + _streamConfig = streamConfig; _currentPartitionGroupMetadata = currentPartitionGroupMetadataList; } @@ -61,7 +63,7 @@ public class PartitionGroupInfoFetcher implements Callable<Boolean> { String clientId = PartitionGroupInfoFetcher.class.getSimpleName() + "-" + _topicName; try ( StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) { - _partitionGroupInfoList = streamMetadataProvider.getPartitionGroupInfoList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L); + _partitionGroupInfoList = streamMetadataProvider.getPartitionGroupInfoList(clientId, _streamConfig, _currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000); if (_exception != null) { // We had at least one failure, but succeeded now. Log an info LOGGER.info("Successfully retrieved partition group info for topic {}", _topicName); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index db48a83..f993fed 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -75,5 +75,7 @@ public abstract class StreamConsumerFactory { } // creates a consumer which consumes from a partition group - public abstract PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata); + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { + return createPartitionLevelConsumer(clientId, metadata.getPartitionGroupId()); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index f595ea3..572cd02 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -19,6 +19,8 @@ package org.apache.pinot.spi.stream; import java.io.Closeable; +import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; @@ -39,11 +41,6 @@ public interface StreamMetadataProvider extends Closeable { @Deprecated int fetchPartitionCount(long timeoutMillis); - // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state - List<PartitionGroupInfo> getPartitionGroupInfoList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) - throws TimeoutException; - // Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0 @Deprecated long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) @@ -60,4 +57,33 @@ public interface StreamMetadataProvider extends Closeable { long offset = fetchPartitionOffset(offsetCriteria, timeoutMillis); return new LongMsgOffset(offset); } + + /** + * Fetch the partitionGroupMetadata list. + * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition. + */ + default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig, + List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis) + throws TimeoutException { + int partitionCount = fetchPartitionCount(timeoutMillis); + List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount); + + // add a PartitionGroupInfo into the list foreach partition already present in current. + // the end checkpoint is set as checkpoint + for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) { + newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), + currentPartitionGroupMetadata.getEndCheckpoint())); + } + // add PartitiongroupInfo for new partitions + // use offset criteria from stream config + StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) { + StreamMetadataProvider partitionMetadataProvider = + streamConsumerFactory.createPartitionMetadataProvider(clientId, i); + StreamPartitionMsgOffset streamPartitionMsgOffset = + partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(), timeoutMillis); + newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString())); + } + return newPartitionGroupInfoList; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org