This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ae863a1087a370ed388a37c91cbd46acdece23b5
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Sat Jan 2 17:10:21 2021 -0800

    Server side changes and some fixes
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 45 ++++++++++------------
 .../realtime/SegmentBuildTimeLeaseExtender.java    |  3 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  7 ++--
 .../impl/fakestream/FakeStreamConsumerFactory.java |  2 +-
 ...lakyConsumerRealtimeClusterIntegrationTest.java |  2 +-
 .../stream/kafka09/KafkaConsumerFactory.java       |  2 +-
 .../stream/kafka20/KafkaConsumerFactory.java       |  4 +-
 .../kafka20/KafkaPartitionLevelConsumer.java       |  1 +
 .../kafka20/KafkaStreamMetadataProvider.java       | 19 ++++++++-
 ...y.java => PartitionGroupCheckpointFactory.java} |  6 +--
 .../pinot/spi/stream/PartitionGroupConsumer.java   |  4 +-
 .../pinot/spi/stream/PartitionLevelConsumer.java   |  8 +++-
 .../pinot/spi/stream/StreamConsumerFactory.java    |  2 +-
 .../stream/StreamPartitionMsgOffsetFactory.java    |  6 ++-
 14 files changed, 70 insertions(+), 41 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 054676e..80aa9d8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -34,7 +34,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
@@ -71,8 +70,9 @@ import 
org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.FetchResult;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -83,8 +83,6 @@ import 
org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamDecoderProvider;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 import org.apache.pinot.spi.stream.TransientConsumerException;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.joda.time.DateTime;
@@ -152,13 +150,13 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   public class SegmentBuildDescriptor {
     final File _segmentTarFile;
     final Map<String, File> _metadataFileMap;
-    final StreamPartitionMsgOffset _offset;
+    final Checkpoint _offset;
     final long _waitTimeMillis;
     final long _buildTimeMillis;
     final long _segmentSizeBytes;
 
     public SegmentBuildDescriptor(@Nullable File segmentTarFile, @Nullable 
Map<String, File> metadataFileMap,
-        StreamPartitionMsgOffset offset, long buildTimeMillis, long 
waitTimeMillis, long segmentSizeBytes) {
+        Checkpoint offset, long buildTimeMillis, long waitTimeMillis, long 
segmentSizeBytes) {
       _segmentTarFile = segmentTarFile;
       _metadataFileMap = metadataFileMap;
       _offset = _streamPartitionMsgOffsetFactory.create(offset);
@@ -167,7 +165,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       _segmentSizeBytes = segmentSizeBytes;
     }
 
-    public StreamPartitionMsgOffset getOffset() {
+    public Checkpoint getOffset() {
       return _offset;
     }
 
@@ -225,7 +223,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private final String _metricKeyName;
   private final ServerMetrics _serverMetrics;
   private final MutableSegmentImpl _realtimeSegment;
-  private StreamPartitionMsgOffset _currentOffset;
+  private Checkpoint _currentOffset;
   private volatile State _state;
   private volatile int _numRowsConsumed = 0;
   private volatile int _numRowsIndexed = 0; // Can be different from 
_numRowsConsumed when metrics update is enabled.
@@ -236,12 +234,12 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private final SegmentVersion _segmentVersion;
   private final SegmentBuildTimeLeaseExtender _leaseExtender;
   private SegmentBuildDescriptor _segmentBuildDescriptor;
-  private final StreamConsumerFactory _streamConsumerFactory;
-  private final StreamPartitionMsgOffsetFactory 
_streamPartitionMsgOffsetFactory;
+  private StreamConsumerFactory _streamConsumerFactory;
+  private PartitionGroupCheckpointFactory _streamPartitionMsgOffsetFactory;
 
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
-  private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch 
up to this one
+  private Checkpoint _finalOffset; // Used when we want to catch up to this one
   private volatile boolean _shouldStop = false;
 
   // It takes 30s to locate controller leader, and more if there are multiple 
controller failures.
@@ -272,7 +270,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private final String _instanceId;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
   private final long _consumeStartTime;
-  private final StreamPartitionMsgOffset _startOffset;
+  private final Checkpoint _startOffset;
   private final PartitionLevelStreamConfig _partitionLevelStreamConfig;
 
   private long _lastLogTime = 0;
@@ -371,7 +369,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     final long idlePipeSleepTimeMillis = 100;
     final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / 
(idlePipeSleepTimeMillis + _partitionLevelStreamConfig
         .getFetchTimeoutMillis());  // 3 minute count
-    StreamPartitionMsgOffset lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory
+    Checkpoint lastUpdatedOffset = _streamPartitionMsgOffsetFactory
         .create(_currentOffset);  // so that we always update the metric when 
we enter this method.
     long consecutiveIdleCount = 0;
     // At this point, we know that we can potentially move the offset, so the 
old saved segment file is not valid
@@ -384,9 +382,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       // Update _currentOffset upon return from this method
       MessageBatch messageBatch;
       try {
-        FetchResult fetchResult = _partitionGroupConsumer
+        messageBatch = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, 
_partitionLevelStreamConfig.getFetchTimeoutMillis());
-        messageBatch = fetchResult.getMessages();
         consecutiveErrorCount = 0;
       } catch (TransientConsumerException e) {
         handleTransientStreamErrors(e);
@@ -560,7 +557,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
           _state = State.HOLDING;
           SegmentCompletionProtocol.Response response = 
postSegmentConsumedMsg();
           SegmentCompletionProtocol.ControllerResponseStatus status = 
response.getStatus();
-          StreamPartitionMsgOffset rspOffset = extractOffset(response);
+          Checkpoint rspOffset = extractOffset(response);
           boolean success;
           switch (status) {
             case NOT_LEADER:
@@ -666,7 +663,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   }
 
   @VisibleForTesting
-  protected StreamPartitionMsgOffset 
extractOffset(SegmentCompletionProtocol.Response response) {
+  protected Checkpoint extractOffset(SegmentCompletionProtocol.Response 
response) {
     if (response.getStreamPartitionMsgOffset() != null) {
       return 
_streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
     } else {
@@ -722,7 +719,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   }
 
   @VisibleForTesting
-  protected StreamPartitionMsgOffset getCurrentOffset() {
+  protected Checkpoint getCurrentOffset() {
     return _currentOffset;
   }
 
@@ -891,14 +888,14 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   }
 
   private void closeKafkaConsumers() {
-    closePartitionLevelConsumer();
+    closePartitionGroupConsumer();
     closeStreamMetadataProvider();
     if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
       _partitionGroupConsumerSemaphore.release();
     }
   }
 
-  private void closePartitionLevelConsumer() {
+  private void closePartitionGroupConsumer() {
     try {
       _partitionGroupConsumer.close();
     } catch (Exception e) {
@@ -966,7 +963,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       // Remove the segment file before we do anything else.
       removeSegmentFile();
       _leaseExtender.removeSegment(_segmentNameStr);
-      final StreamPartitionMsgOffset endOffset = 
_streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset());
+      final Checkpoint endOffset = 
_streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset());
       segmentLogger
           .info("State: {}, transitioning from CONSUMING to ONLINE 
(startOffset: {}, endOffset: {})", _state.toString(),
               _startOffset, endOffset);
@@ -1043,7 +1040,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     return System.currentTimeMillis();
   }
 
-  private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, 
long timeoutMs) {
+  private boolean catchupToFinalOffset(Checkpoint endOffset, long timeoutMs) {
     _finalOffset = endOffset;
     _consumeEndTime = now() + timeoutMs;
     _state = State.CONSUMING_TO_ONLINE;
@@ -1316,10 +1313,10 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
    */
   private void makeStreamConsumer(String reason) {
     if (_partitionGroupConsumer != null) {
-      closePartitionLevelConsumer();
+      closePartitionGroupConsumer();
     }
     segmentLogger.info("Creating new stream consumer, reason: {}", reason);
-    _partitionGroupConsumer = 
_streamConsumerFactory.createPartitionGroupConsumer(_partitionGroupMetadata);
+    _partitionGroupConsumer = 
_streamConsumerFactory.createPartitionGroupConsumer(_clientId, 
_partitionGroupMetadata);
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index 69d7e80..b1a1342 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,7 +92,7 @@ public class SegmentBuildTimeLeaseExtender {
    * @param initialBuildTimeMs is the initial time budget that 
SegmentCompletionManager has allocated.
    * @param offset The offset at which this segment is being built.
    */
-  public void addSegment(String segmentId, long initialBuildTimeMs, 
StreamPartitionMsgOffset offset) {
+  public void addSegment(String segmentId, long initialBuildTimeMs, Checkpoint 
offset) {
     final long initialDelayMs = initialBuildTimeMs * 9 / 10;
     final SegmentCompletionProtocol.Request.Params reqParams = new 
SegmentCompletionProtocol.Request.Params();
     
reqParams.withStreamPartitionMsgOffset(offset.toString()).withSegmentName(segmentId).withExtraTimeSec(EXTRA_TIME_SECONDS)
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index d09bdeb..d7aec8d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -46,6 +46,7 @@ import 
org.apache.pinot.core.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
@@ -193,7 +194,7 @@ public class LLRealtimeSegmentDataManagerTest {
               + "  \"status\" : \"CATCH_UP\""
               + "}";
       SegmentCompletionProtocol.Response response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
-      StreamPartitionMsgOffset extractedOffset = 
segmentDataManager.extractOffset(response);
+      Checkpoint extractedOffset = segmentDataManager.extractOffset(response);
       Assert.assertEquals(extractedOffset.compareTo(new 
LongMsgOffset(offset)), 0);
     }
     {
@@ -207,7 +208,7 @@ public class LLRealtimeSegmentDataManagerTest {
               + "  \"status\" : \"CATCH_UP\""
               + "}";
       SegmentCompletionProtocol.Response response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
-      StreamPartitionMsgOffset extractedOffset = 
segmentDataManager.extractOffset(response);
+      Checkpoint extractedOffset = segmentDataManager.extractOffset(response);
       Assert.assertEquals(extractedOffset.compareTo(new 
LongMsgOffset(offset)), 0);
     }
     {
@@ -221,7 +222,7 @@ public class LLRealtimeSegmentDataManagerTest {
               + "  \"status\" : \"CATCH_UP\""
               + "}";
       SegmentCompletionProtocol.Response response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
-      StreamPartitionMsgOffset extractedOffset = 
segmentDataManager.extractOffset(response);
+      Checkpoint extractedOffset = segmentDataManager.extractOffset(response);
       Assert.assertEquals(extractedOffset.compareTo(new 
LongMsgOffset(offset)), 0);
     }
     segmentDataManager.destroy();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 6121eef..fbeb808 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -71,7 +71,7 @@ public class FakeStreamConsumerFactory extends 
StreamConsumerFactory {
 
 
   @Override
-  public PartitionGroupConsumer 
createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, 
PartitionGroupMetadata metadata) {
     return null;
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index d917d73..c7523e3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -121,7 +121,7 @@ public class FlakyConsumerRealtimeClusterIntegrationTest 
extends RealtimeCluster
     }
 
     @Override
-    public PartitionGroupConsumer 
createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
+    public PartitionGroupConsumer createPartitionGroupConsumer(String 
clientId, PartitionGroupMetadata metadata) {
       return null;
     }
   }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index 82c282c..fe5a461 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -54,7 +54,7 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
   }
 
   @Override
-  public PartitionGroupConsumer 
createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, 
PartitionGroupMetadata metadata) {
     return null;
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index c73aacb..b6746ff 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -51,7 +51,7 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
   }
 
   @Override
-  public PartitionGroupConsumer 
createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
-    return null;
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, 
PartitionGroupMetadata metadata) {
+    return new KafkaPartitionLevelConsumer(clientId, _streamConfig, 
metadata.getPartitionGroupId());
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index f9b4365..25b1742 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index ef22b6a..1d3162a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupInfo;
@@ -76,7 +77,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     // use offset criteria from stream config
     for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; 
i++) {
       StreamPartitionMsgOffset streamPartitionMsgOffset =
-          fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000);
+          fetchStreamPartitionOffsetInternal(i, 
_streamConfig.getOffsetCriteria(), 5000);
       newPartitionGroupInfoList.add(new PartitionGroupInfo(i, 
streamPartitionMsgOffset.toString()));
     }
     return newPartitionGroupInfoList;
@@ -104,6 +105,22 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     return new LongMsgOffset(offset);
   }
 
+  private StreamPartitionMsgOffset fetchStreamPartitionOffsetInternal(int 
partitionId, @Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) {
+    Preconditions.checkNotNull(offsetCriteria);
+    TopicPartition topicPartition = new TopicPartition(_topic, partitionId);
+    long offset = -1;
+    if (offsetCriteria.isLargest()) {
+      offset =  
_consumer.endOffsets(Collections.singletonList(topicPartition), 
Duration.ofMillis(timeoutMillis))
+          .get(topicPartition);
+    } else if (offsetCriteria.isSmallest()) {
+      offset =  
_consumer.beginningOffsets(Collections.singletonList(topicPartition), 
Duration.ofMillis(timeoutMillis))
+          .get(topicPartition);
+    } else {
+      throw new IllegalArgumentException("Unknown initial offset value " + 
offsetCriteria.toString());
+    }
+    return new LongMsgOffset(offset);
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
similarity index 89%
copy from 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
copy to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
index d61d32d..14d2f39 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
  * An interface to be implemented by streams that are consumed using Pinot LLC 
consumption.
  */
 @InterfaceStability.Evolving
-public interface StreamPartitionMsgOffsetFactory {
+public interface PartitionGroupCheckpointFactory {
   /**
    * Initialization, called once when the factory is created.
    * @param streamConfig
@@ -37,7 +37,7 @@ public interface StreamPartitionMsgOffsetFactory {
    * @param offsetStr
    * @return StreamPartitionMsgOffset
    */
-  StreamPartitionMsgOffset create(String offsetStr);
+  Checkpoint create(String offsetStr);
 
   /**
    * Construct an offset from another one provided, of the same type.
@@ -45,5 +45,5 @@ public interface StreamPartitionMsgOffsetFactory {
    * @param other
    * @return
    */
-  StreamPartitionMsgOffset create(StreamPartitionMsgOffset other);
+  Checkpoint create(Checkpoint other);
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index bbbdaad..b421268 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.spi.stream;
 
 import java.io.Closeable;
+import java.util.concurrent.TimeoutException;
 
 
 public interface PartitionGroupConsumer extends Closeable {
-  FetchResult fetchMessages(Checkpoint start, Checkpoint end, long timeout);
+  MessageBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout)
+      throws TimeoutException;
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
index 3a0a1d2..3bedc8a 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public interface PartitionLevelConsumer extends Closeable {
+public interface PartitionLevelConsumer extends Closeable, 
PartitionGroupConsumer {
 
   /**
    * Is here for backward compatibility for a short time.
@@ -62,4 +62,10 @@ public interface PartitionLevelConsumer extends Closeable {
     long endOffsetLong = endOffset == null ? Long.MAX_VALUE : 
((LongMsgOffset)endOffset).getOffset();
     return fetchMessages(startOffsetLong, endOffsetLong, timeoutMillis);
   }
+
+  default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint 
endCheckpoint, int timeoutMillis)
+      throws java.util.concurrent.TimeoutException {
+    // TODO Issue 5359 remove this default implementation once all kafka 
consumers have migrated to use this API
+    return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, 
(StreamPartitionMsgOffset) endCheckpoint, timeoutMillis);
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 9caf61b..db48a83 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -75,5 +75,5 @@ public abstract class StreamConsumerFactory {
   }
 
   // creates a consumer which consumes from a partition group
-  public abstract PartitionGroupConsumer 
createPartitionGroupConsumer(PartitionGroupMetadata metadata);
+  public abstract PartitionGroupConsumer createPartitionGroupConsumer(String 
clientId, PartitionGroupMetadata metadata);
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
index d61d32d..2e3386c 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
  * An interface to be implemented by streams that are consumed using Pinot LLC 
consumption.
  */
 @InterfaceStability.Evolving
-public interface StreamPartitionMsgOffsetFactory {
+public interface StreamPartitionMsgOffsetFactory extends 
PartitionGroupCheckpointFactory{
   /**
    * Initialization, called once when the factory is created.
    * @param streamConfig
@@ -46,4 +46,8 @@ public interface StreamPartitionMsgOffsetFactory {
    * @return
    */
   StreamPartitionMsgOffset create(StreamPartitionMsgOffset other);
+
+  default Checkpoint create(Checkpoint other) {
+    return create((StreamPartitionMsgOffset) other);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to