This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ae863a1087a370ed388a37c91cbd46acdece23b5 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Sat Jan 2 17:10:21 2021 -0800 Server side changes and some fixes --- .../realtime/LLRealtimeSegmentDataManager.java | 45 ++++++++++------------ .../realtime/SegmentBuildTimeLeaseExtender.java | 3 +- .../realtime/LLRealtimeSegmentDataManagerTest.java | 7 ++-- .../impl/fakestream/FakeStreamConsumerFactory.java | 2 +- ...lakyConsumerRealtimeClusterIntegrationTest.java | 2 +- .../stream/kafka09/KafkaConsumerFactory.java | 2 +- .../stream/kafka20/KafkaConsumerFactory.java | 4 +- .../kafka20/KafkaPartitionLevelConsumer.java | 1 + .../kafka20/KafkaStreamMetadataProvider.java | 19 ++++++++- ...y.java => PartitionGroupCheckpointFactory.java} | 6 +-- .../pinot/spi/stream/PartitionGroupConsumer.java | 4 +- .../pinot/spi/stream/PartitionLevelConsumer.java | 8 +++- .../pinot/spi/stream/StreamConsumerFactory.java | 2 +- .../stream/StreamPartitionMsgOffsetFactory.java | 6 ++- 14 files changed, 70 insertions(+), 41 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 054676e..80aa9d8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -71,8 +70,9 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.stream.FetchResult; +import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory; import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; @@ -83,8 +83,6 @@ import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pinot.spi.stream.StreamDecoderProvider; import org.apache.pinot.spi.stream.StreamMessageDecoder; import org.apache.pinot.spi.stream.StreamMetadataProvider; -import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; -import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; import org.apache.pinot.spi.stream.TransientConsumerException; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.joda.time.DateTime; @@ -152,13 +150,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { public class SegmentBuildDescriptor { final File _segmentTarFile; final Map<String, File> _metadataFileMap; - final StreamPartitionMsgOffset _offset; + final Checkpoint _offset; final long _waitTimeMillis; final long _buildTimeMillis; final long _segmentSizeBytes; public SegmentBuildDescriptor(@Nullable File segmentTarFile, @Nullable Map<String, File> metadataFileMap, - StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) { + Checkpoint offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) { _segmentTarFile = segmentTarFile; _metadataFileMap = metadataFileMap; _offset = _streamPartitionMsgOffsetFactory.create(offset); @@ -167,7 +165,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentSizeBytes = segmentSizeBytes; } - public StreamPartitionMsgOffset getOffset() { + public Checkpoint getOffset() { return _offset; } @@ -225,7 +223,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private final String _metricKeyName; private final ServerMetrics _serverMetrics; private final MutableSegmentImpl _realtimeSegment; - private StreamPartitionMsgOffset _currentOffset; + private Checkpoint _currentOffset; private volatile State _state; private volatile int _numRowsConsumed = 0; private volatile int _numRowsIndexed = 0; // Can be different from _numRowsConsumed when metrics update is enabled. @@ -236,12 +234,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private final SegmentVersion _segmentVersion; private final SegmentBuildTimeLeaseExtender _leaseExtender; private SegmentBuildDescriptor _segmentBuildDescriptor; - private final StreamConsumerFactory _streamConsumerFactory; - private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory; + private StreamConsumerFactory _streamConsumerFactory; + private PartitionGroupCheckpointFactory _streamPartitionMsgOffsetFactory; // Segment end criteria private volatile long _consumeEndTime = 0; - private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one + private Checkpoint _finalOffset; // Used when we want to catch up to this one private volatile boolean _shouldStop = false; // It takes 30s to locate controller leader, and more if there are multiple controller failures. @@ -272,7 +270,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private final String _instanceId; private final ServerSegmentCompletionProtocolHandler _protocolHandler; private final long _consumeStartTime; - private final StreamPartitionMsgOffset _startOffset; + private final Checkpoint _startOffset; private final PartitionLevelStreamConfig _partitionLevelStreamConfig; private long _lastLogTime = 0; @@ -371,7 +369,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { final long idlePipeSleepTimeMillis = 100; final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig .getFetchTimeoutMillis()); // 3 minute count - StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory + Checkpoint lastUpdatedOffset = _streamPartitionMsgOffsetFactory .create(_currentOffset); // so that we always update the metric when we enter this method. long consecutiveIdleCount = 0; // At this point, we know that we can potentially move the offset, so the old saved segment file is not valid @@ -384,9 +382,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Update _currentOffset upon return from this method MessageBatch messageBatch; try { - FetchResult fetchResult = _partitionGroupConsumer + messageBatch = _partitionGroupConsumer .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis()); - messageBatch = fetchResult.getMessages(); consecutiveErrorCount = 0; } catch (TransientConsumerException e) { handleTransientStreamErrors(e); @@ -560,7 +557,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _state = State.HOLDING; SegmentCompletionProtocol.Response response = postSegmentConsumedMsg(); SegmentCompletionProtocol.ControllerResponseStatus status = response.getStatus(); - StreamPartitionMsgOffset rspOffset = extractOffset(response); + Checkpoint rspOffset = extractOffset(response); boolean success; switch (status) { case NOT_LEADER: @@ -666,7 +663,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } @VisibleForTesting - protected StreamPartitionMsgOffset extractOffset(SegmentCompletionProtocol.Response response) { + protected Checkpoint extractOffset(SegmentCompletionProtocol.Response response) { if (response.getStreamPartitionMsgOffset() != null) { return _streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset()); } else { @@ -722,7 +719,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } @VisibleForTesting - protected StreamPartitionMsgOffset getCurrentOffset() { + protected Checkpoint getCurrentOffset() { return _currentOffset; } @@ -891,14 +888,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } private void closeKafkaConsumers() { - closePartitionLevelConsumer(); + closePartitionGroupConsumer(); closeStreamMetadataProvider(); if (_acquiredConsumerSemaphore.compareAndSet(true, false)) { _partitionGroupConsumerSemaphore.release(); } } - private void closePartitionLevelConsumer() { + private void closePartitionGroupConsumer() { try { _partitionGroupConsumer.close(); } catch (Exception e) { @@ -966,7 +963,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Remove the segment file before we do anything else. removeSegmentFile(); _leaseExtender.removeSegment(_segmentNameStr); - final StreamPartitionMsgOffset endOffset = _streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset()); + final Checkpoint endOffset = _streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset()); segmentLogger .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(), _startOffset, endOffset); @@ -1043,7 +1040,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { return System.currentTimeMillis(); } - private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, long timeoutMs) { + private boolean catchupToFinalOffset(Checkpoint endOffset, long timeoutMs) { _finalOffset = endOffset; _consumeEndTime = now() + timeoutMs; _state = State.CONSUMING_TO_ONLINE; @@ -1316,10 +1313,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { */ private void makeStreamConsumer(String reason) { if (_partitionGroupConsumer != null) { - closePartitionLevelConsumer(); + closePartitionGroupConsumer(); } segmentLogger.info("Creating new stream consumer, reason: {}", reason); - _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_partitionGroupMetadata); + _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupMetadata); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java index 69d7e80..b1a1342 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +92,7 @@ public class SegmentBuildTimeLeaseExtender { * @param initialBuildTimeMs is the initial time budget that SegmentCompletionManager has allocated. * @param offset The offset at which this segment is being built. */ - public void addSegment(String segmentId, long initialBuildTimeMs, StreamPartitionMsgOffset offset) { + public void addSegment(String segmentId, long initialBuildTimeMs, Checkpoint offset) { final long initialDelayMs = initialBuildTimeMs * 9 / 10; final SegmentCompletionProtocol.Request.Params reqParams = new SegmentCompletionProtocol.Request.Params(); reqParams.withStreamPartitionMsgOffset(offset.toString()).withSegmentName(segmentId).withExtraTimeSec(EXTRA_TIME_SECONDS) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index d09bdeb..d7aec8d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -46,6 +46,7 @@ import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.LongMsgOffsetFactory; import org.apache.pinot.spi.stream.PermanentConsumerException; @@ -193,7 +194,7 @@ public class LLRealtimeSegmentDataManagerTest { + " \"status\" : \"CATCH_UP\"" + "}"; SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); - StreamPartitionMsgOffset extractedOffset = segmentDataManager.extractOffset(response); + Checkpoint extractedOffset = segmentDataManager.extractOffset(response); Assert.assertEquals(extractedOffset.compareTo(new LongMsgOffset(offset)), 0); } { @@ -207,7 +208,7 @@ public class LLRealtimeSegmentDataManagerTest { + " \"status\" : \"CATCH_UP\"" + "}"; SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); - StreamPartitionMsgOffset extractedOffset = segmentDataManager.extractOffset(response); + Checkpoint extractedOffset = segmentDataManager.extractOffset(response); Assert.assertEquals(extractedOffset.compareTo(new LongMsgOffset(offset)), 0); } { @@ -221,7 +222,7 @@ public class LLRealtimeSegmentDataManagerTest { + " \"status\" : \"CATCH_UP\"" + "}"; SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); - StreamPartitionMsgOffset extractedOffset = segmentDataManager.extractOffset(response); + Checkpoint extractedOffset = segmentDataManager.extractOffset(response); Assert.assertEquals(extractedOffset.compareTo(new LongMsgOffset(offset)), 0); } segmentDataManager.destroy(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java index 6121eef..fbeb808 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java @@ -71,7 +71,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { @Override - public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) { + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { return null; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java index d917d73..c7523e3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java @@ -121,7 +121,7 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster } @Override - public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) { + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { return null; } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java index 82c282c..fe5a461 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java @@ -54,7 +54,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { } @Override - public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) { + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { return null; } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java index c73aacb..b6746ff 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java @@ -51,7 +51,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { } @Override - public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) { - return null; + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { + return new KafkaPartitionLevelConsumer(clientId, _streamConfig, metadata.getPartitionGroupId()); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index f9b4365..25b1742 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.utils.Bytes; +import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionLevelConsumer; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index ef22b6a..1d3162a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.kafka.common.TopicPartition; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupInfo; @@ -76,7 +77,7 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa // use offset criteria from stream config for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) { StreamPartitionMsgOffset streamPartitionMsgOffset = - fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000); + fetchStreamPartitionOffsetInternal(i, _streamConfig.getOffsetCriteria(), 5000); newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString())); } return newPartitionGroupInfoList; @@ -104,6 +105,22 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa return new LongMsgOffset(offset); } + private StreamPartitionMsgOffset fetchStreamPartitionOffsetInternal(int partitionId, @Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) { + Preconditions.checkNotNull(offsetCriteria); + TopicPartition topicPartition = new TopicPartition(_topic, partitionId); + long offset = -1; + if (offsetCriteria.isLargest()) { + offset = _consumer.endOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis)) + .get(topicPartition); + } else if (offsetCriteria.isSmallest()) { + offset = _consumer.beginningOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis)) + .get(topicPartition); + } else { + throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString()); + } + return new LongMsgOffset(offset); + } + @Override public void close() throws IOException { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java similarity index 89% copy from pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java copy to pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java index d61d32d..14d2f39 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java @@ -25,7 +25,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability; * An interface to be implemented by streams that are consumed using Pinot LLC consumption. */ @InterfaceStability.Evolving -public interface StreamPartitionMsgOffsetFactory { +public interface PartitionGroupCheckpointFactory { /** * Initialization, called once when the factory is created. * @param streamConfig @@ -37,7 +37,7 @@ public interface StreamPartitionMsgOffsetFactory { * @param offsetStr * @return StreamPartitionMsgOffset */ - StreamPartitionMsgOffset create(String offsetStr); + Checkpoint create(String offsetStr); /** * Construct an offset from another one provided, of the same type. @@ -45,5 +45,5 @@ public interface StreamPartitionMsgOffsetFactory { * @param other * @return */ - StreamPartitionMsgOffset create(StreamPartitionMsgOffset other); + Checkpoint create(Checkpoint other); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java index bbbdaad..b421268 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java @@ -19,8 +19,10 @@ package org.apache.pinot.spi.stream; import java.io.Closeable; +import java.util.concurrent.TimeoutException; public interface PartitionGroupConsumer extends Closeable { - FetchResult fetchMessages(Checkpoint start, Checkpoint end, long timeout); + MessageBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout) + throws TimeoutException; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java index 3a0a1d2..3bedc8a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java @@ -28,7 +28,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface PartitionLevelConsumer extends Closeable { +public interface PartitionLevelConsumer extends Closeable, PartitionGroupConsumer { /** * Is here for backward compatibility for a short time. @@ -62,4 +62,10 @@ public interface PartitionLevelConsumer extends Closeable { long endOffsetLong = endOffset == null ? Long.MAX_VALUE : ((LongMsgOffset)endOffset).getOffset(); return fetchMessages(startOffsetLong, endOffsetLong, timeoutMillis); } + + default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMillis) + throws java.util.concurrent.TimeoutException { + // TODO Issue 5359 remove this default implementation once all kafka consumers have migrated to use this API + return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, (StreamPartitionMsgOffset) endCheckpoint, timeoutMillis); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index 9caf61b..db48a83 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -75,5 +75,5 @@ public abstract class StreamConsumerFactory { } // creates a consumer which consumes from a partition group - public abstract PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata); + public abstract PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java index d61d32d..2e3386c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java @@ -25,7 +25,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability; * An interface to be implemented by streams that are consumed using Pinot LLC consumption. */ @InterfaceStability.Evolving -public interface StreamPartitionMsgOffsetFactory { +public interface StreamPartitionMsgOffsetFactory extends PartitionGroupCheckpointFactory{ /** * Initialization, called once when the factory is created. * @param streamConfig @@ -46,4 +46,8 @@ public interface StreamPartitionMsgOffsetFactory { * @return */ StreamPartitionMsgOffset create(StreamPartitionMsgOffset other); + + default Checkpoint create(Checkpoint other) { + return create((StreamPartitionMsgOffset) other); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org