This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit be19cf6866235d7bf4ce0a74424ae2378f40c8bc Author: KKcorps <kharekar...@gmail.com> AuthorDate: Mon Dec 21 14:25:25 2020 +0530 Refactor: get shard iterator methods --- .../plugin/stream/kinesis/KinesisConsumer.java | 25 ++++++++++++---------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index fd48a92..3263f87 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -163,21 +163,24 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) { - GetShardIteratorResponse getShardIteratorResponse; - if (kinesisStartCheckpoint.getSequenceNumber() != null) { - String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber(); - getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId) - .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) - .startingSequenceNumber(kinesisStartSequenceNumber).build()); + return getShardIterator(ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisStartCheckpoint.getSequenceNumber()); } else { - getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream) - .shardIteratorType(ShardIteratorType.LATEST).build()); + return getShardIterator(ShardIteratorType.LATEST, null); } + } - return getShardIteratorResponse.shardIterator(); + public String getShardIterator(ShardIteratorType shardIteratorType, String sequenceNumber){ + if(sequenceNumber == null){ + return _kinesisClient.getShardIterator( + GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream) + .shardIteratorType(shardIteratorType).build()).shardIterator(); + }else{ + return _kinesisClient.getShardIterator( + GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId) + .shardIteratorType(shardIteratorType) + .startingSequenceNumber(sequenceNumber).build()).shardIterator(); + } } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org