This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a3deab3ea5154f03debf10751e273dc1a82ec919
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Tue Jan 5 01:19:28 2021 +0530

    Add support for stream partition offsets (#6402)
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  3 +-
 .../plugin/stream/kinesis/KinesisConsumer.java     | 10 +++++--
 .../stream/kinesis/KinesisConsumerFactory.java     |  5 ++++
 .../stream/kinesis/KinesisMsgOffsetFactory.java    | 32 ++++++++++++++++++++++
 .../plugin/stream/kinesis/KinesisRecordsBatch.java | 15 ++++++----
 5 files changed, 56 insertions(+), 9 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 1b8f86e..d42f899 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -23,10 +23,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
-public class KinesisCheckpoint implements Checkpoint {
+public class KinesisCheckpoint implements StreamPartitionMsgOffset {
   private Map<String, String> _shardToStartSequenceMap;
 
   public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 8a24208..8ed3de7 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -80,6 +80,7 @@ public class KinesisConsumer extends KinesisConnectionHandler 
implements Partiti
         createConnection();
       }
 
+      //TODO: iterate upon all the shardIds in the map
       Map.Entry<String, String> next = 
kinesisStartCheckpoint.getShardToStartSequenceMap().entrySet().iterator().next();
       String shardIterator = getShardIterator(next.getKey(), next.getValue());
 
@@ -125,7 +126,7 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
         nextStartSequenceNumber = recordList.get(recordList.size() - 
1).sequenceNumber();
       }
 
-      return new KinesisRecordsBatch(recordList);
+      return new KinesisRecordsBatch(recordList, next.getKey());
     } catch (ProvisionedThroughputExceededException e) {
       LOG.warn("The request rate for the stream is too high", e);
       return handleException(kinesisStartCheckpoint, recordList);
@@ -147,13 +148,16 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
   }
 
   private KinesisRecordsBatch handleException(KinesisCheckpoint start, 
List<Record> recordList) {
+    String shardId = 
start.getShardToStartSequenceMap().entrySet().iterator().next().getKey();
+
     if (recordList.size() > 0) {
       String nextStartSequenceNumber = recordList.get(recordList.size() - 
1).sequenceNumber();
       Map<String, String> newCheckpoint = new 
HashMap<>(start.getShardToStartSequenceMap());
       newCheckpoint.put(newCheckpoint.keySet().iterator().next(), 
nextStartSequenceNumber);
-      return new KinesisRecordsBatch(recordList);
+
+      return new KinesisRecordsBatch(recordList, shardId);
     } else {
-      return new KinesisRecordsBatch(recordList);
+      return new KinesisRecordsBatch(recordList, shardId);
 
     }
   }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index aa90812..631f240 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -25,6 +25,7 @@ import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 
 
 public class KinesisConsumerFactory extends StreamConsumerFactory {
@@ -55,4 +56,8 @@ public class KinesisConsumerFactory extends 
StreamConsumerFactory {
     return new KinesisConsumer(new KinesisConfig(_streamConfig));
   }
 
+  @Override
+  public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() {
+    return new KinesisMsgOffsetFactory();
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
new file mode 100644
index 0000000..f234bae
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.io.IOException;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+
+
+public class KinesisMsgOffsetFactory implements 
StreamPartitionMsgOffsetFactory {
+
+  KinesisConfig _kinesisConfig;
+
+  @Override
+  public void init(StreamConfig streamConfig) {
+    _kinesisConfig = new KinesisConfig(streamConfig);
+  }
+
+  @Override
+  public StreamPartitionMsgOffset create(String offsetStr) {
+    try {
+      return new KinesisCheckpoint(offsetStr);
+    }catch (IOException e){
+      return null;
+    }
+  }
+
+  @Override
+  public StreamPartitionMsgOffset create(StreamPartitionMsgOffset other) {
+    return new KinesisCheckpoint(((KinesisCheckpoint) 
other).getShardToStartSequenceMap());
+  }
+
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index 04bf4e6..fb4bfb3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -27,9 +29,11 @@ import software.amazon.awssdk.services.kinesis.model.Record;
 
 public class KinesisRecordsBatch implements MessageBatch<byte[]> {
   private List<Record> _recordList;
+  private String _shardId;
 
-  public KinesisRecordsBatch(List<Record> recordList) {
+  public KinesisRecordsBatch(List<Record> recordList, String shardId) {
     _recordList = recordList;
+    _shardId = shardId;
   }
 
   @Override
@@ -39,13 +43,12 @@ public class KinesisRecordsBatch implements 
MessageBatch<byte[]> {
 
   @Override
   public byte[] getMessageAtIndex(int index) {
-    return _recordList.get(index).data().asByteArray();
+    return _recordList.get(index).data().asByteBuffer().array();
   }
 
   @Override
   public int getMessageOffsetAtIndex(int index) {
-    //TODO: Doesn't translate to offset. Needs to be replaced.
-    return _recordList.get(index).hashCode();
+    return _recordList.get(index).data().asByteBuffer().arrayOffset();
   }
 
   @Override
@@ -60,7 +63,9 @@ public class KinesisRecordsBatch implements 
MessageBatch<byte[]> {
 
   @Override
   public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int 
index) {
-    throw new UnsupportedOperationException();
+    Map<String, String> shardToSequenceMap = new HashMap<>();
+    shardToSequenceMap.put(_shardId, _recordList.get(index).sequenceNumber());
+    return new KinesisCheckpoint(shardToSequenceMap);
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to