This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 7a4fccc3ad68f72f363f1663f6956c4b2aa6cc78 Author: KKcorps <kharekar...@gmail.com> AuthorDate: Sun Dec 20 01:25:13 2020 +0530 Move shardId out of checkpoint to partition group metadata --- .../plugin/stream/kinesis/KinesisCheckpoint.java | 14 ++------------ .../plugin/stream/kinesis/KinesisConsumer.java | 21 +++++++++------------ .../stream/kinesis/KinesisConsumerFactory.java | 2 +- .../kinesis/KinesisPartitionGroupMetadataMap.java | 4 +++- .../plugin/stream/kinesis/KinesisShardMetadata.java | 5 ++--- 5 files changed, 17 insertions(+), 29 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 8448665..aa80b17 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -4,11 +4,9 @@ import org.apache.pinot.spi.stream.v2.Checkpoint; public class KinesisCheckpoint implements Checkpoint { - String _shardId; String _sequenceNumber; - public KinesisCheckpoint(String shardId, String sequenceNumber){ - _shardId = shardId; + public KinesisCheckpoint(String sequenceNumber){ _sequenceNumber = sequenceNumber; } @@ -16,14 +14,6 @@ public class KinesisCheckpoint implements Checkpoint { return _sequenceNumber; } - public String getShardId() { - return _shardId; - } - - public void setShardId(String shardId) { - _shardId = shardId; - } - @Override public byte[] serialize() { return _sequenceNumber.getBytes(); @@ -32,7 +22,7 @@ public class KinesisCheckpoint implements Checkpoint { @Override public Checkpoint deserialize(byte[] blob) { //TODO: Implement SerDe - return new KinesisCheckpoint("", new String(blob)); + return new KinesisCheckpoint(new String(blob)); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 7bc1006..d896d67 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -7,6 +7,7 @@ import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.ConsumerV2; import org.apache.pinot.spi.stream.v2.FetchResult; +import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; @@ -18,18 +19,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { String _stream; Integer _maxRecords; + String _shardId; - //TODO: Fetch AWS region from Stream Config. - public KinesisConsumer(String stream, String awsRegion) { - super(stream, awsRegion); - _stream = stream; - _maxRecords = 20; - } - - public KinesisConsumer(String stream, String awsRegion, StreamConfig streamConfig) { - super(stream, awsRegion); + public KinesisConsumer(String stream, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata) { + super(stream, streamConfig.getStreamConfigsMap().getOrDefault("aws-region", "global")); _stream = stream; _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20")); + KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata; + _shardId = kinesisShardMetadata.getShardId(); } @Override @@ -73,7 +70,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); } - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(kinesisStartCheckpoint.getShardId(), nextStartSequenceNumber); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); @@ -86,11 +83,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume if(kinesisStartCheckpoint.getSequenceNumber() != null) { String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber(); getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().streamName(_stream).shardId(kinesisStartCheckpoint.getShardId()).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) .startingSequenceNumber(kinesisStartSequenceNumber).build()); } else{ getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().shardId(kinesisStartCheckpoint.getShardId()).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build()); + GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build()); } return getShardIteratorResponse.shardIterator(); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index bdbc348..0608118 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -31,6 +31,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { @Override public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) { - return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"), _streamConfig); + return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig, metadata); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java index d15804e..700ec3f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -16,9 +16,11 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i super(stream, awsRegion); List<Shard> shardList = getShards(); for(Shard shard : shardList){ + String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber(); String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion); - shardMetadata.setStartCheckpoint(new KinesisCheckpoint(shard.shardId(), endingSequenceNumber)); + shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber)); + shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber)); _stringPartitionGroupMetadataIndex.add(shardMetadata); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java index 693b307..e1d23da 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -15,9 +15,8 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa public KinesisShardMetadata(String shardId, String streamName, String awsRegion) { super(streamName, awsRegion); - - _startCheckpoint = new KinesisCheckpoint(shardId, null); - _endCheckpoint = new KinesisCheckpoint(shardId, null); + _startCheckpoint = null; + _endCheckpoint = null; _shardId = shardId; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org