This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 96c32c461b006a15cdf34170a40f076e09be73cc Author: KKcorps <kharekar...@gmail.com> AuthorDate: Sun Dec 20 11:35:18 2020 +0530 Add Kinesis config wrapper --- .../pinot/plugin/stream/kinesis/KinesisConfig.java | 29 ++++++++ .../plugin/stream/kinesis/KinesisConsumer.java | 78 ++++++++++++---------- .../stream/kinesis/KinesisConsumerFactory.java | 10 ++- 3 files changed, 74 insertions(+), 43 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java new file mode 100644 index 0000000..01d666a --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java @@ -0,0 +1,29 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import org.apache.pinot.spi.stream.StreamConfig; + + +public class KinesisConfig { + private final StreamConfig _streamConfig; + private static final String AWS_REGION = "aws-region"; + private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch"; + + private static final String DEFAULT_AWS_REGION = "us-central-1"; + private static final String DEFAULT_MAX_RECORDS = "20"; + + public KinesisConfig(StreamConfig streamConfig) { + _streamConfig = streamConfig; + } + + public String getStream(){ + return _streamConfig.getTopicName(); + } + + public String getAwsRegion(){ + return _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, DEFAULT_AWS_REGION); + } + + public Integer maxRecordsToFetch(){ + return Integer.parseInt(_streamConfig.getStreamConfigsMap().getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS)); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 7670f06..96241d4 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -30,71 +30,75 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; - +//TODO: Handle exceptions and timeout public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { String _stream; Integer _maxRecords; String _shardId; - public KinesisConsumer(String stream, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata) { - super(stream, streamConfig.getStreamConfigsMap().getOrDefault("aws-region", "global")); - _stream = stream; - _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20")); + public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) { + super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion()); + _stream = kinesisConfig.getStream(); + _maxRecords = kinesisConfig.maxRecordsToFetch(); KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata; _shardId = kinesisShardMetadata.getShardId(); } @Override public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { - KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; + try { + KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; - String shardIterator = getShardIterator(kinesisStartCheckpoint); + String shardIterator = getShardIterator(kinesisStartCheckpoint); - List<Record> recordList = new ArrayList<>(); + List<Record> recordList = new ArrayList<>(); - String kinesisEndSequenceNumber = null; + String kinesisEndSequenceNumber = null; - if (end != null) { - KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end; - kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber(); - } + if (end != null) { + KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end; + kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber(); + } - String nextStartSequenceNumber = null; - Long startTimestamp = System.currentTimeMillis(); + String nextStartSequenceNumber = null; + Long startTimestamp = System.currentTimeMillis(); - while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) { - GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); - GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); + while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) { + GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); + GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); - if (getRecordsResponse.records().size() > 0) { - recordList.addAll(getRecordsResponse.records()); - nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); + if (getRecordsResponse.records().size() > 0) { + recordList.addAll(getRecordsResponse.records()); + nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - if (kinesisEndSequenceNumber != null - && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) { - nextStartSequenceNumber = kinesisEndSequenceNumber; - break; - } + if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) { + nextStartSequenceNumber = kinesisEndSequenceNumber; + break; + } - if (recordList.size() >= _maxRecords) { - break; + if (recordList.size() >= _maxRecords) { + break; + } } - } - shardIterator = getRecordsResponse.nextShardIterator(); - } + shardIterator = getRecordsResponse.nextShardIterator(); + } - if (nextStartSequenceNumber == null && recordList.size() > 0) { - nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - } + if (nextStartSequenceNumber == null && recordList.size() > 0) { + nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); + } - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); - KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); + KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); - return kinesisFetchResult; + return kinesisFetchResult; + }catch (KinesisException e){ + return null; + } } private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index 931fa07..da39aab 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -28,19 +28,17 @@ import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2; public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { - private StreamConfig _streamConfig; - private final String AWS_REGION = "aws-region"; + private KinesisConfig _kinesisConfig; @Override public void init(StreamConfig streamConfig) { - _streamConfig = streamConfig; + _kinesisConfig = new KinesisConfig(streamConfig); } @Override public PartitionGroupMetadataMap getPartitionGroupsMetadata( PartitionGroupMetadataMap currentPartitionGroupsMetadata) { - return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), - _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "global")); + return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion()); } @Override @@ -50,6 +48,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { @Override public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) { - return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig, metadata); + return new KinesisConsumer(_kinesisConfig, metadata); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org