This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a3deab3ea5154f03debf10751e273dc1a82ec919 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Tue Jan 5 01:19:28 2021 +0530 Add support for stream partition offsets (#6402) --- .../plugin/stream/kinesis/KinesisCheckpoint.java | 3 +- .../plugin/stream/kinesis/KinesisConsumer.java | 10 +++++-- .../stream/kinesis/KinesisConsumerFactory.java | 5 ++++ .../stream/kinesis/KinesisMsgOffsetFactory.java | 32 ++++++++++++++++++++++ .../plugin/stream/kinesis/KinesisRecordsBatch.java | 15 ++++++---- 5 files changed, 56 insertions(+), 9 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 1b8f86e..d42f899 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -23,10 +23,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; import java.util.Map; import org.apache.pinot.spi.stream.Checkpoint; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.JsonUtils; -public class KinesisCheckpoint implements Checkpoint { +public class KinesisCheckpoint implements StreamPartitionMsgOffset { private Map<String, String> _shardToStartSequenceMap; public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 8a24208..8ed3de7 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -80,6 +80,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti createConnection(); } + //TODO: iterate upon all the shardIds in the map Map.Entry<String, String> next = kinesisStartCheckpoint.getShardToStartSequenceMap().entrySet().iterator().next(); String shardIterator = getShardIterator(next.getKey(), next.getValue()); @@ -125,7 +126,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); } - return new KinesisRecordsBatch(recordList); + return new KinesisRecordsBatch(recordList, next.getKey()); } catch (ProvisionedThroughputExceededException e) { LOG.warn("The request rate for the stream is too high", e); return handleException(kinesisStartCheckpoint, recordList); @@ -147,13 +148,16 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti } private KinesisRecordsBatch handleException(KinesisCheckpoint start, List<Record> recordList) { + String shardId = start.getShardToStartSequenceMap().entrySet().iterator().next().getKey(); + if (recordList.size() > 0) { String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); Map<String, String> newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap()); newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber); - return new KinesisRecordsBatch(recordList); + + return new KinesisRecordsBatch(recordList, shardId); } else { - return new KinesisRecordsBatch(recordList); + return new KinesisRecordsBatch(recordList, shardId); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index aa90812..631f240 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -25,6 +25,7 @@ import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamLevelConsumer; import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; public class KinesisConsumerFactory extends StreamConsumerFactory { @@ -55,4 +56,8 @@ public class KinesisConsumerFactory extends StreamConsumerFactory { return new KinesisConsumer(new KinesisConfig(_streamConfig)); } + @Override + public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() { + return new KinesisMsgOffsetFactory(); + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java new file mode 100644 index 0000000..f234bae --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java @@ -0,0 +1,32 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import java.io.IOException; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; + + +public class KinesisMsgOffsetFactory implements StreamPartitionMsgOffsetFactory { + + KinesisConfig _kinesisConfig; + + @Override + public void init(StreamConfig streamConfig) { + _kinesisConfig = new KinesisConfig(streamConfig); + } + + @Override + public StreamPartitionMsgOffset create(String offsetStr) { + try { + return new KinesisCheckpoint(offsetStr); + }catch (IOException e){ + return null; + } + } + + @Override + public StreamPartitionMsgOffset create(StreamPartitionMsgOffset other) { + return new KinesisCheckpoint(((KinesisCheckpoint) other).getShardToStartSequenceMap()); + } + +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java index 04bf4e6..fb4bfb3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java @@ -18,7 +18,9 @@ */ package org.apache.pinot.plugin.stream.kinesis; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.RowMetadata; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -27,9 +29,11 @@ import software.amazon.awssdk.services.kinesis.model.Record; public class KinesisRecordsBatch implements MessageBatch<byte[]> { private List<Record> _recordList; + private String _shardId; - public KinesisRecordsBatch(List<Record> recordList) { + public KinesisRecordsBatch(List<Record> recordList, String shardId) { _recordList = recordList; + _shardId = shardId; } @Override @@ -39,13 +43,12 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> { @Override public byte[] getMessageAtIndex(int index) { - return _recordList.get(index).data().asByteArray(); + return _recordList.get(index).data().asByteBuffer().array(); } @Override public int getMessageOffsetAtIndex(int index) { - //TODO: Doesn't translate to offset. Needs to be replaced. - return _recordList.get(index).hashCode(); + return _recordList.get(index).data().asByteBuffer().arrayOffset(); } @Override @@ -60,7 +63,9 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> { @Override public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) { - throw new UnsupportedOperationException(); + Map<String, String> shardToSequenceMap = new HashMap<>(); + shardToSequenceMap.put(_shardId, _recordList.get(index).sequenceNumber()); + return new KinesisCheckpoint(shardToSequenceMap); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org