This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b0d8c1b422e58013c48e07c5469698229936a621 Author: KKcorps <kharekar...@gmail.com> AuthorDate: Sun Dec 20 00:54:16 2020 +0530 Fix consumer code --- .../pinot-stream-ingestion/pinot-kinesis/pom.xml | 11 +-- .../plugin/stream/kinesis/KinesisCheckpoint.java | 15 +++- .../stream/kinesis/KinesisConnectionHandler.java | 21 +++++- .../plugin/stream/kinesis/KinesisConsumer.java | 88 +++++++++++++++------- .../stream/kinesis/KinesisConsumerFactory.java | 2 +- .../plugin/stream/kinesis/KinesisFetchResult.java | 8 +- .../kinesis/KinesisPartitionGroupMetadataMap.java | 9 +-- .../stream/kinesis/KinesisShardMetadata.java | 11 +-- 8 files changed, 112 insertions(+), 53 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml index 97e5eef..f863d17 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -15,7 +15,7 @@ <properties> <pinot.root>${basedir}/../../..</pinot.root> <phase.prop>package</phase.prop> - <aws.version>2.15.42</aws.version> + <aws.version>2.13.46</aws.version> </properties> <dependencies> @@ -24,12 +24,13 @@ <artifactId>kinesis</artifactId> <version>${aws.version}</version> </dependency> + <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core --> <dependency> - <groupId>org.apache.pinot</groupId> - <artifactId>pinot-json</artifactId> - <version>${project.version}</version> - <scope>test</scope> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>2.12.0</version> </dependency> + <dependency> <groupId>org.apache.pinot</groupId> <artifactId>pinot-spi</artifactId> diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 77f790b..8448665 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -4,9 +4,11 @@ import org.apache.pinot.spi.stream.v2.Checkpoint; public class KinesisCheckpoint implements Checkpoint { + String _shardId; String _sequenceNumber; - public KinesisCheckpoint(String sequenceNumber){ + public KinesisCheckpoint(String shardId, String sequenceNumber){ + _shardId = shardId; _sequenceNumber = sequenceNumber; } @@ -14,6 +16,14 @@ public class KinesisCheckpoint implements Checkpoint { return _sequenceNumber; } + public String getShardId() { + return _shardId; + } + + public void setShardId(String shardId) { + _shardId = shardId; + } + @Override public byte[] serialize() { return _sequenceNumber.getBytes(); @@ -21,7 +31,8 @@ public class KinesisCheckpoint implements Checkpoint { @Override public Checkpoint deserialize(byte[] blob) { - return new KinesisCheckpoint(new String(blob)); + //TODO: Implement SerDe + return new KinesisCheckpoint("", new String(blob)); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java index 7ea24c0..d8888fa 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -1,25 +1,42 @@ package org.apache.pinot.plugin.stream.kinesis; +import java.util.List; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.v2.ConsumerV2; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.StreamDescription; public class KinesisConnectionHandler { - String _awsRegion = ""; + private String _stream; + private String _awsRegion; KinesisClient _kinesisClient; public KinesisConnectionHandler(){ } - public KinesisConnectionHandler(String awsRegion){ + public KinesisConnectionHandler(String stream, String awsRegion){ + _stream = stream; _awsRegion = awsRegion; _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build(); } + public List<Shard> getShards(){ + ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build()); + return listShardsResponse.shards(); + } + } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index dc44079..7bc1006 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -3,6 +3,7 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.ConsumerV2; import org.apache.pinot.spi.stream.v2.FetchResult; @@ -16,57 +17,86 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { String _stream; + Integer _maxRecords; //TODO: Fetch AWS region from Stream Config. public KinesisConsumer(String stream, String awsRegion) { - super(awsRegion); + super(stream, awsRegion); _stream = stream; + _maxRecords = 20; + } + + public KinesisConsumer(String stream, String awsRegion, StreamConfig streamConfig) { + super(stream, awsRegion); + _stream = stream; + _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20")); } @Override - public FetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { + public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; - KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end; - String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber(); - String kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber(); + String shardIterator = getShardIterator(kinesisStartCheckpoint); - GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().streamName(_stream).shardIteratorType( - ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber(kinesisStartSequenceNumber).build()); - - String shardIterator = getShardIteratorResponse.shardIterator(); - GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); - GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); + List<Record> recordList = new ArrayList<>(); - String kinesisNextShardIterator = getRecordsResponse.nextShardIterator(); + String kinesisEndSequenceNumber = null; - //TODO: Get records in the loop and stop when end sequence number is reached or there is an exception. - if(!getRecordsResponse.hasRecords()){ - return new KinesisFetchResult(kinesisStartSequenceNumber, Collections.emptyList()); + if(end != null) { + KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end; + kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber(); } - List<Record> recordList = new ArrayList<>(); - recordList.addAll(getRecordsResponse.records()); + String nextStartSequenceNumber = null; + Long startTimestamp = System.currentTimeMillis(); + + while(shardIterator != null && !isTimedOut(startTimestamp, timeout)){ + GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); + GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); - String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - while(kinesisNextShardIterator != null){ - getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisNextShardIterator).build(); - getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); - if(getRecordsResponse.hasRecords()){ + if(getRecordsResponse.records().size() > 0){ recordList.addAll(getRecordsResponse.records()); nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - } - if(kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ) { - nextStartSequenceNumber = kinesisEndSequenceNumber; - break; + if(kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ){ + nextStartSequenceNumber = kinesisEndSequenceNumber; + break; + } + + if(recordList.size() >= _maxRecords) break; } - kinesisNextShardIterator = getRecordsResponse.nextShardIterator(); + + shardIterator = getRecordsResponse.nextShardIterator(); } - KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(nextStartSequenceNumber, - getRecordsResponse.records()); + if(nextStartSequenceNumber == null && recordList.size() > 0){ + nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); + } + + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(kinesisStartCheckpoint.getShardId(), nextStartSequenceNumber); + KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, + recordList); return kinesisFetchResult; } + + private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) { + GetShardIteratorResponse getShardIteratorResponse; + + if(kinesisStartCheckpoint.getSequenceNumber() != null) { + String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber(); + getShardIteratorResponse = _kinesisClient.getShardIterator( + GetShardIteratorRequest.builder().streamName(_stream).shardId(kinesisStartCheckpoint.getShardId()).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .startingSequenceNumber(kinesisStartSequenceNumber).build()); + } else{ + getShardIteratorResponse = _kinesisClient.getShardIterator( + GetShardIteratorRequest.builder().shardId(kinesisStartCheckpoint.getShardId()).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build()); + } + + return getShardIteratorResponse.shardIterator(); + } + + private boolean isTimedOut(Long startTimestamp, Long timeout) { + return (System.currentTimeMillis() - startTimestamp) >= timeout; + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index 6bd1e3a..bdbc348 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -31,6 +31,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { @Override public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) { - return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1")); + return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"), _streamConfig); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java index dc8e764..2996b28 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java @@ -8,17 +8,17 @@ import software.amazon.awssdk.services.kinesis.model.Record; public class KinesisFetchResult implements FetchResult<Record> { - private final String _nextShardIterator; + private final KinesisCheckpoint _kinesisCheckpoint; private final List<Record> _recordList; - public KinesisFetchResult(String nextShardIterator, List<Record> recordList){ - _nextShardIterator = nextShardIterator; + public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList){ + _kinesisCheckpoint = kinesisCheckpoint; _recordList = recordList; } @Override public Checkpoint getLastCheckpoint() { - return new KinesisCheckpoint(_nextShardIterator); + return _kinesisCheckpoint; } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java index 87f7235..d15804e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -13,13 +13,12 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>(); public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){ - super(awsRegion); - ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build()); - List<Shard> shardList = listShardsResponse.shards(); + super(stream, awsRegion); + List<Shard> shardList = getShards(); for(Shard shard : shardList){ String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); - KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream); - shardMetadata.setStartCheckpoint(new KinesisCheckpoint(endingSequenceNumber)); + KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion); + shardMetadata.setStartCheckpoint(new KinesisCheckpoint(shard.shardId(), endingSequenceNumber)); _stringPartitionGroupMetadataIndex.add(shardMetadata); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java index 4a19285..693b307 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -4,6 +4,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; @@ -12,11 +13,11 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa Checkpoint _startCheckpoint; Checkpoint _endCheckpoint; - public KinesisShardMetadata(String shardId, String streamName) { - GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).shardIteratorType( - ShardIteratorType.LATEST).streamName(streamName).build()); - _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator()); - _endCheckpoint = null; + public KinesisShardMetadata(String shardId, String streamName, String awsRegion) { + super(streamName, awsRegion); + + _startCheckpoint = new KinesisCheckpoint(shardId, null); + _endCheckpoint = new KinesisCheckpoint(shardId, null); _shardId = shardId; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org