This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit e7a53146f1f9946acaffc6caa82471fef65ca551
Author: KKcorps <kharekar...@gmail.com>
AuthorDate: Fri Dec 11 13:57:25 2020 +0530

    Add kinesis code to handle offsets
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java   | 13 ++++---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 42 +++++++++++++++++++---
 .../stream/kinesis/KinesisConsumerFactory.java     | 36 +++++++++++++++++++
 .../plugin/stream/kinesis/KinesisFetchResult.java  | 11 +++---
 .../kinesis/KinesisPartitionGroupMetadataMap.java  | 31 ++++++++++++++++
 .../stream/kinesis/KinesisShardMetadata.java       |  5 ++-
 6 files changed, 121 insertions(+), 17 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index a330e78..77f790b 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -1,23 +1,22 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 
 
 public class KinesisCheckpoint implements Checkpoint {
-  String _shardIterator;
+  String _sequenceNumber;
 
-  public KinesisCheckpoint(String shardIterator){
-    _shardIterator = shardIterator;
+  public KinesisCheckpoint(String sequenceNumber){
+    _sequenceNumber = sequenceNumber;
   }
 
-  public String getShardIterator() {
-    return _shardIterator;
+  public String getSequenceNumber() {
+    return _sequenceNumber;
   }
 
   @Override
   public byte[] serialize() {
-    return _shardIterator.getBytes();
+    return _sequenceNumber.getBytes();
   }
 
   @Override
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 251d831..dc44079 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -1,19 +1,26 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisConsumer extends KinesisConnectionHandler implements 
ConsumerV2 {
+  String _stream;
 
   //TODO: Fetch AWS region from  Stream Config.
-  public KinesisConsumer(String awsRegion) {
+  public KinesisConsumer(String stream, String awsRegion) {
     super(awsRegion);
+    _stream = stream;
   }
 
   @Override
@@ -21,18 +28,43 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Consume
     KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
     KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
 
-    String kinesisShardIteratorStart = 
kinesisStartCheckpoint.getShardIterator();
+    String kinesisStartSequenceNumber = 
kinesisStartCheckpoint.getSequenceNumber();
+    String kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
 
-    GetRecordsRequest getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build();
+    GetShardIteratorResponse getShardIteratorResponse = 
_kinesisClient.getShardIterator(GetShardIteratorRequest.builder().streamName(_stream).shardIteratorType(
+        
ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber(kinesisStartSequenceNumber).build());
+
+    String shardIterator = getShardIteratorResponse.shardIterator();
+    GetRecordsRequest getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(shardIterator).build();
     GetRecordsResponse getRecordsResponse = 
_kinesisClient.getRecords(getRecordsRequest);
 
     String kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
 
+    //TODO: Get records in the loop and stop when end sequence number is 
reached or there is an exception.
     if(!getRecordsResponse.hasRecords()){
-      return new KinesisFetchResult(kinesisNextShardIterator, 
Collections.emptyList());
+      return new KinesisFetchResult(kinesisStartSequenceNumber, 
Collections.emptyList());
+    }
+
+    List<Record> recordList = new ArrayList<>();
+    recordList.addAll(getRecordsResponse.records());
+
+    String nextStartSequenceNumber = recordList.get(recordList.size() - 
1).sequenceNumber();
+    while(kinesisNextShardIterator != null){
+      getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(kinesisNextShardIterator).build();
+      getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+      if(getRecordsResponse.hasRecords()){
+        recordList.addAll(getRecordsResponse.records());
+        nextStartSequenceNumber = recordList.get(recordList.size() - 
1).sequenceNumber();
+      }
+
+      if(kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 
1).sequenceNumber()) <= 0 ) {
+        nextStartSequenceNumber = kinesisEndSequenceNumber;
+        break;
+      }
+      kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
     }
 
-    KinesisFetchResult kinesisFetchResult = new 
KinesisFetchResult(kinesisNextShardIterator,
+    KinesisFetchResult kinesisFetchResult = new 
KinesisFetchResult(nextStartSequenceNumber,
         getRecordsResponse.records());
 
     return kinesisFetchResult;
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
new file mode 100644
index 0000000..6bd1e3a
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -0,0 +1,36 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
+import org.apache.pinot.spi.stream.v2.SegmentNameGenerator;
+import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2;
+
+
+public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
+  private StreamConfig _streamConfig;
+  private final String AWS_REGION = "aws-region";
+
+  @Override
+  public void init(StreamConfig streamConfig) {
+    _streamConfig = streamConfig;
+  }
+
+  @Override
+  public PartitionGroupMetadataMap getPartitionGroupsMetadata(
+      PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
+    return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), 
_streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+  }
+
+  @Override
+  public SegmentNameGenerator getSegmentNameGenerator() {
+    return null;
+  }
+
+  @Override
+  public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
+    return new KinesisConsumer(_streamConfig.getTopicName(), 
_streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 5ef4e30..dc8e764 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -1,16 +1,19 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
 
-public class KinesisFetchResult implements FetchResult {
-  private String _nextShardIterator;
+public class KinesisFetchResult implements FetchResult<Record> {
+  private final String _nextShardIterator;
+  private final List<Record> _recordList;
 
   public KinesisFetchResult(String nextShardIterator, List<Record> recordList){
      _nextShardIterator = nextShardIterator;
+     _recordList = recordList;
   }
 
   @Override
@@ -19,7 +22,7 @@ public class KinesisFetchResult implements FetchResult {
   }
 
   @Override
-  public byte[] getMessages() {
-    return new byte[0];
+  public List<Record> getMessages() {
+    return _recordList;
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
new file mode 100644
index 0000000..bc3fef2
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -0,0 +1,31 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+
+public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler 
implements PartitionGroupMetadataMap {
+  private Map<String, PartitionGroupMetadata> _stringPartitionGroupMetadataMap 
= new HashMap<>();
+
+  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
+    super(awsRegion);
+    ListShardsResponse listShardsResponse = 
_kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build());
+    List<Shard> shardList = listShardsResponse.shards();
+    for(Shard shard : shardList){
+      String endingSequenceNumber = 
shard.sequenceNumberRange().endingSequenceNumber();
+      KinesisShardMetadata shardMetadata = new 
KinesisShardMetadata(shard.shardId(), stream);
+      shardMetadata.setEndCheckpoint(new 
KinesisCheckpoint(endingSequenceNumber));
+      _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata);
+    }
+  }
+
+  public Map<String, PartitionGroupMetadata> getPartitionMetadata(){
+      return _stringPartitionGroupMetadataMap;
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 07ede73..d50d821 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -4,6 +4,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisShardMetadata extends KinesisConnectionHandler implements 
PartitionGroupMetadata {
@@ -11,8 +12,10 @@ public class KinesisShardMetadata extends 
KinesisConnectionHandler implements Pa
   Checkpoint _endCheckpoint;
 
   public KinesisShardMetadata(String shardId, String streamName) {
-    GetShardIteratorResponse getShardIteratorResponse = 
_kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build());
+    GetShardIteratorResponse getShardIteratorResponse = 
_kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).shardIteratorType(
+        ShardIteratorType.LATEST).streamName(streamName).build());
     _startCheckpoint = new 
KinesisCheckpoint(getShardIteratorResponse.shardIterator());
+    _endCheckpoint = null;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to