This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b0d8c1b422e58013c48e07c5469698229936a621
Author: KKcorps <kharekar...@gmail.com>
AuthorDate: Sun Dec 20 00:54:16 2020 +0530

    Fix consumer code
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   | 11 +--
 .../plugin/stream/kinesis/KinesisCheckpoint.java   | 15 +++-
 .../stream/kinesis/KinesisConnectionHandler.java   | 21 +++++-
 .../plugin/stream/kinesis/KinesisConsumer.java     | 88 +++++++++++++++-------
 .../stream/kinesis/KinesisConsumerFactory.java     |  2 +-
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  8 +-
 .../kinesis/KinesisPartitionGroupMetadataMap.java  |  9 +--
 .../stream/kinesis/KinesisShardMetadata.java       | 11 +--
 8 files changed, 112 insertions(+), 53 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 97e5eef..f863d17 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -15,7 +15,7 @@
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
     <phase.prop>package</phase.prop>
-    <aws.version>2.15.42</aws.version>
+    <aws.version>2.13.46</aws.version>
   </properties>
 
   <dependencies>
@@ -24,12 +24,13 @@
       <artifactId>kinesis</artifactId>
       <version>${aws.version}</version>
     </dependency>
+    <!-- 
https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-json</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>2.12.0</version>
     </dependency>
+
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-spi</artifactId>
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 77f790b..8448665 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -4,9 +4,11 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 
 
 public class KinesisCheckpoint implements Checkpoint {
+  String _shardId;
   String _sequenceNumber;
 
-  public KinesisCheckpoint(String sequenceNumber){
+  public KinesisCheckpoint(String shardId, String sequenceNumber){
+    _shardId = shardId;
     _sequenceNumber = sequenceNumber;
   }
 
@@ -14,6 +16,14 @@ public class KinesisCheckpoint implements Checkpoint {
     return _sequenceNumber;
   }
 
+  public String getShardId() {
+    return _shardId;
+  }
+
+  public void setShardId(String shardId) {
+    _shardId = shardId;
+  }
+
   @Override
   public byte[] serialize() {
     return _sequenceNumber.getBytes();
@@ -21,7 +31,8 @@ public class KinesisCheckpoint implements Checkpoint {
 
   @Override
   public Checkpoint deserialize(byte[] blob) {
-    return new KinesisCheckpoint(new String(blob));
+    //TODO: Implement SerDe
+    return new KinesisCheckpoint("", new String(blob));
   }
 
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 7ea24c0..d8888fa 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -1,25 +1,42 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.List;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StreamDescription;
 
 
 public class KinesisConnectionHandler {
-  String _awsRegion = "";
+  private String _stream;
+  private String _awsRegion;
   KinesisClient _kinesisClient;
 
   public KinesisConnectionHandler(){
 
   }
 
-  public KinesisConnectionHandler(String awsRegion){
+  public KinesisConnectionHandler(String stream, String awsRegion){
+    _stream = stream;
     _awsRegion = awsRegion;
     _kinesisClient = 
KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build();
   }
 
+  public List<Shard> getShards(){
+    ListShardsResponse listShardsResponse =  
_kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
+    return listShardsResponse.shards();
+  }
+
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index dc44079..7bc1006 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -3,6 +3,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.FetchResult;
@@ -16,57 +17,86 @@ import 
software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 public class KinesisConsumer extends KinesisConnectionHandler implements 
ConsumerV2 {
   String _stream;
+  Integer _maxRecords;
 
   //TODO: Fetch AWS region from  Stream Config.
   public KinesisConsumer(String stream, String awsRegion) {
-    super(awsRegion);
+    super(stream, awsRegion);
     _stream = stream;
+    _maxRecords = 20;
+  }
+
+  public KinesisConsumer(String stream, String awsRegion, StreamConfig 
streamConfig) {
+    super(stream, awsRegion);
+    _stream = stream;
+    _maxRecords = 
Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", 
"20"));
   }
 
   @Override
-  public FetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+  public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long 
timeout) {
     KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
-    KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
 
-    String kinesisStartSequenceNumber = 
kinesisStartCheckpoint.getSequenceNumber();
-    String kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
+    String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
-    GetShardIteratorResponse getShardIteratorResponse = 
_kinesisClient.getShardIterator(GetShardIteratorRequest.builder().streamName(_stream).shardIteratorType(
-        
ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber(kinesisStartSequenceNumber).build());
-
-    String shardIterator = getShardIteratorResponse.shardIterator();
-    GetRecordsRequest getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(shardIterator).build();
-    GetRecordsResponse getRecordsResponse = 
_kinesisClient.getRecords(getRecordsRequest);
+    List<Record> recordList = new ArrayList<>();
 
-    String kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
+    String kinesisEndSequenceNumber = null;
 
-    //TODO: Get records in the loop and stop when end sequence number is 
reached or there is an exception.
-    if(!getRecordsResponse.hasRecords()){
-      return new KinesisFetchResult(kinesisStartSequenceNumber, 
Collections.emptyList());
+    if(end != null) {
+      KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
+      kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
     }
 
-    List<Record> recordList = new ArrayList<>();
-    recordList.addAll(getRecordsResponse.records());
+    String nextStartSequenceNumber = null;
+    Long startTimestamp = System.currentTimeMillis();
+
+    while(shardIterator != null && !isTimedOut(startTimestamp, timeout)){
+      GetRecordsRequest getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(shardIterator).build();
+      GetRecordsResponse getRecordsResponse = 
_kinesisClient.getRecords(getRecordsRequest);
 
-    String nextStartSequenceNumber = recordList.get(recordList.size() - 
1).sequenceNumber();
-    while(kinesisNextShardIterator != null){
-      getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(kinesisNextShardIterator).build();
-      getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
-      if(getRecordsResponse.hasRecords()){
+      if(getRecordsResponse.records().size() > 0){
         recordList.addAll(getRecordsResponse.records());
         nextStartSequenceNumber = recordList.get(recordList.size() - 
1).sequenceNumber();
-      }
 
-      if(kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 
1).sequenceNumber()) <= 0 ) {
-        nextStartSequenceNumber = kinesisEndSequenceNumber;
-        break;
+        if(kinesisEndSequenceNumber != null && 
kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 
1).sequenceNumber()) <= 0 ){
+          nextStartSequenceNumber = kinesisEndSequenceNumber;
+          break;
+        }
+
+        if(recordList.size() >= _maxRecords) break;
       }
-      kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
+
+      shardIterator = getRecordsResponse.nextShardIterator();
     }
 
-    KinesisFetchResult kinesisFetchResult = new 
KinesisFetchResult(nextStartSequenceNumber,
-        getRecordsResponse.records());
+    if(nextStartSequenceNumber == null && recordList.size() > 0){
+      nextStartSequenceNumber = recordList.get(recordList.size() - 
1).sequenceNumber();
+    }
+
+    KinesisCheckpoint kinesisCheckpoint = new 
KinesisCheckpoint(kinesisStartCheckpoint.getShardId(), nextStartSequenceNumber);
+    KinesisFetchResult kinesisFetchResult = new 
KinesisFetchResult(kinesisCheckpoint,
+        recordList);
 
     return kinesisFetchResult;
   }
+
+  private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
+    GetShardIteratorResponse getShardIteratorResponse;
+
+    if(kinesisStartCheckpoint.getSequenceNumber() != null) {
+      String kinesisStartSequenceNumber = 
kinesisStartCheckpoint.getSequenceNumber();
+      getShardIteratorResponse = _kinesisClient.getShardIterator(
+          
GetShardIteratorRequest.builder().streamName(_stream).shardId(kinesisStartCheckpoint.getShardId()).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+              .startingSequenceNumber(kinesisStartSequenceNumber).build());
+    } else{
+      getShardIteratorResponse = _kinesisClient.getShardIterator(
+          
GetShardIteratorRequest.builder().shardId(kinesisStartCheckpoint.getShardId()).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
+    }
+
+    return getShardIteratorResponse.shardIterator();
+  }
+
+  private boolean isTimedOut(Long startTimestamp, Long timeout) {
+    return (System.currentTimeMillis() - startTimestamp) >= timeout;
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 6bd1e3a..bdbc348 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -31,6 +31,6 @@ public class KinesisConsumerFactory implements 
StreamConsumerFactoryV2 {
 
   @Override
   public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
-    return new KinesisConsumer(_streamConfig.getTopicName(), 
_streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+    return new KinesisConsumer(_streamConfig.getTopicName(), 
_streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"), 
_streamConfig);
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index dc8e764..2996b28 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -8,17 +8,17 @@ import software.amazon.awssdk.services.kinesis.model.Record;
 
 
 public class KinesisFetchResult implements FetchResult<Record> {
-  private final String _nextShardIterator;
+  private final KinesisCheckpoint _kinesisCheckpoint;
   private final List<Record> _recordList;
 
-  public KinesisFetchResult(String nextShardIterator, List<Record> recordList){
-     _nextShardIterator = nextShardIterator;
+  public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> 
recordList){
+     _kinesisCheckpoint = kinesisCheckpoint;
      _recordList = recordList;
   }
 
   @Override
   public Checkpoint getLastCheckpoint() {
-    return new KinesisCheckpoint(_nextShardIterator);
+    return _kinesisCheckpoint;
   }
 
   @Override
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 87f7235..d15804e 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -13,13 +13,12 @@ public class KinesisPartitionGroupMetadataMap extends 
KinesisConnectionHandler i
   private final List<PartitionGroupMetadata> 
_stringPartitionGroupMetadataIndex = new ArrayList<>();
 
   public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
-    super(awsRegion);
-    ListShardsResponse listShardsResponse = 
_kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build());
-    List<Shard> shardList = listShardsResponse.shards();
+    super(stream, awsRegion);
+    List<Shard> shardList = getShards();
     for(Shard shard : shardList){
       String endingSequenceNumber = 
shard.sequenceNumberRange().endingSequenceNumber();
-      KinesisShardMetadata shardMetadata = new 
KinesisShardMetadata(shard.shardId(), stream);
-      shardMetadata.setStartCheckpoint(new 
KinesisCheckpoint(endingSequenceNumber));
+      KinesisShardMetadata shardMetadata = new 
KinesisShardMetadata(shard.shardId(), stream, awsRegion);
+      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(shard.shardId(), 
endingSequenceNumber));
       _stringPartitionGroupMetadataIndex.add(shardMetadata);
     }
   }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 4a19285..693b307 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -4,6 +4,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
@@ -12,11 +13,11 @@ public class KinesisShardMetadata extends 
KinesisConnectionHandler implements Pa
   Checkpoint _startCheckpoint;
   Checkpoint _endCheckpoint;
 
-  public KinesisShardMetadata(String shardId, String streamName) {
-    GetShardIteratorResponse getShardIteratorResponse = 
_kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).shardIteratorType(
-        ShardIteratorType.LATEST).streamName(streamName).build());
-    _startCheckpoint = new 
KinesisCheckpoint(getShardIteratorResponse.shardIterator());
-    _endCheckpoint = null;
+  public KinesisShardMetadata(String shardId, String streamName, String 
awsRegion) {
+    super(streamName, awsRegion);
+
+    _startCheckpoint = new KinesisCheckpoint(shardId, null);
+    _endCheckpoint = new KinesisCheckpoint(shardId, null);
     _shardId = shardId;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to