This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit c5c42d497320a3e7aedca4a7e1c43808e69222f9 Author: KKcorps <kharekar...@gmail.com> AuthorDate: Thu Dec 31 11:24:42 2020 +0530 Return message batch instead of list in the fetch result --- .../plugin/stream/kinesis/KinesisFetchResult.java | 7 +-- .../plugin/stream/kinesis/KinesisRecordsBatch.java | 52 ++++++++++++++++++++++ .../plugin/stream/kinesis/KinesisConsumerTest.java | 7 +-- .../apache/pinot/spi/stream/v2/FetchResult.java | 3 +- 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java index aedcd5d..39561f3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java @@ -20,12 +20,13 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.ArrayList; import java.util.List; +import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.FetchResult; import software.amazon.awssdk.services.kinesis.model.Record; -public class KinesisFetchResult implements FetchResult<Record> { +public class KinesisFetchResult implements FetchResult<byte[]> { private final KinesisCheckpoint _kinesisCheckpoint; private final List<Record> _recordList; @@ -40,7 +41,7 @@ public class KinesisFetchResult implements FetchResult<Record> { } @Override - public List<Record> getMessages() { - return _recordList; + public KinesisRecordsBatch getMessages() { + return new KinesisRecordsBatch(_recordList); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java new file mode 100644 index 0000000..ed51f8f --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java @@ -0,0 +1,52 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import java.util.List; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import software.amazon.awssdk.services.kinesis.model.Record; + + +public class KinesisRecordsBatch implements MessageBatch<byte[]> { + private List<Record> _recordList; + + public KinesisRecordsBatch(List<Record> recordList) { + _recordList = recordList; + } + + @Override + public int getMessageCount() { + return _recordList.size(); + } + + @Override + public byte[] getMessageAtIndex(int index) { + return _recordList.get(index).data().asByteArray(); + } + + @Override + public int getMessageOffsetAtIndex(int index) { + //TODO: Doesn't translate to offset. Needs to be replaced. + return _recordList.get(index).hashCode(); + } + + @Override + public int getMessageLengthAtIndex(int index) { + return _recordList.get(index).data().asByteArray().length; + } + + @Override + public RowMetadata getMetadataAtIndex(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getNextStreamMessageOffsetAtIndex(int index) { + throw new UnsupportedOperationException(); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java index 17691c4..6f660f7 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java @@ -48,10 +48,11 @@ public class KinesisConsumerTest { KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber()); KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L); - List<Record> list = fetchResult.getMessages(); + KinesisRecordsBatch list = fetchResult.getMessages(); + int n = list.getMessageCount(); - for (Record record : list) { - System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String()); + for (int i=0;i<n;i++) { + System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i)); } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java index 9d14473..2188ac9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java @@ -19,10 +19,11 @@ package org.apache.pinot.spi.stream.v2; import java.util.List; +import org.apache.pinot.spi.stream.MessageBatch; public interface FetchResult<T> { Checkpoint getLastCheckpoint(); - List<T> getMessages(); + MessageBatch<T> getMessages(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org