This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 26085a88ac34d6b00737bbd68b8ff409ad281467 Author: KKcorps <kharekar...@gmail.com> AuthorDate: Tue Dec 22 20:42:05 2020 +0530 Change shard metadata logic --- .../pinot-stream-ingestion/pinot-kinesis/pom.xml | 2 +- .../plugin/stream/kinesis/KinesisCheckpoint.java | 2 +- .../stream/kinesis/KinesisConsumerFactory.java | 2 +- .../plugin/stream/kinesis/KinesisFetchResult.java | 2 +- .../kinesis/KinesisPartitionGroupMetadataMap.java | 55 +++++++++++++++++++--- .../stream/kinesis/KinesisShardMetadata.java | 16 +++---- 6 files changed, 60 insertions(+), 19 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml index 1abc536..0c9ae0b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -35,7 +35,7 @@ <properties> <pinot.root>${basedir}/../../..</pinot.root> <phase.prop>package</phase.prop> - <aws.version>2.13.46</aws.version> + <aws.version>2.15.50</aws.version> </properties> <dependencies> diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 450173c..8de95e2 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -38,7 +38,7 @@ public class KinesisCheckpoint implements Checkpoint { } @Override - public Checkpoint deserialize(byte[] blob) { + public KinesisCheckpoint deserialize(byte[] blob) { //TODO: Implement SerDe return new KinesisCheckpoint(new String(blob)); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index da39aab..acac1fb 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -38,7 +38,7 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { @Override public PartitionGroupMetadataMap getPartitionGroupsMetadata( PartitionGroupMetadataMap currentPartitionGroupsMetadata) { - return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion()); + return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), currentPartitionGroupsMetadata); } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java index 52dab66..aedcd5d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java @@ -35,7 +35,7 @@ public class KinesisFetchResult implements FetchResult<Record> { } @Override - public Checkpoint getLastCheckpoint() { + public KinesisCheckpoint getLastCheckpoint() { return _kinesisCheckpoint; } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java index 9a34004..d77579e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -19,7 +19,11 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; @@ -30,19 +34,56 @@ import software.amazon.awssdk.services.kinesis.model.Shard; public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap { private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>(); - public KinesisPartitionGroupMetadataMap(String stream, String awsRegion) { + public KinesisPartitionGroupMetadataMap(String stream, String awsRegion, + PartitionGroupMetadataMap partitionGroupMetadataMap) { + //TODO: Handle child shards. Do not consume data from child shard unless parent is finished. + //Return metadata only for shards in current metadata super(stream, awsRegion); + KinesisPartitionGroupMetadataMap currentPartitionMeta = + (KinesisPartitionGroupMetadataMap) partitionGroupMetadataMap; + List<PartitionGroupMetadata> currentMetaList = currentPartitionMeta.getMetadataList(); + List<Shard> shardList = getShards(); + + Map<String, PartitionGroupMetadata> metadataMap = new HashMap<>(); + for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) { + KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata; + metadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata); + } + for (Shard shard : shardList) { - String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber(); - String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); - KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion); - shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber)); - shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber)); - _stringPartitionGroupMetadataIndex.add(shardMetadata); + if (metadataMap.containsKey(shard.shardId())) { + //Return existing shard metadata + _stringPartitionGroupMetadataIndex.add(metadataMap.get(shard.shardId())); + } else if (metadataMap.containsKey(shard.parentShardId())) { + KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) metadataMap.get(shard.parentShardId()); + if (isProcessingFinished(kinesisShardMetadata)) { + //Add child shards for processing since parent has finished + appendShardMetadata(stream, awsRegion, shard); + } else { + //Do not process this shard unless the parent shard is finished or expired + } + } else { + //This is a new shard with no parents. We can start processing this shard. + appendShardMetadata(stream, awsRegion, shard); + } } } + private boolean isProcessingFinished(KinesisShardMetadata kinesisShardMetadata) { + return kinesisShardMetadata.getEndCheckpoint().getSequenceNumber() != null && kinesisShardMetadata + .getStartCheckpoint().getSequenceNumber().equals(kinesisShardMetadata.getEndCheckpoint().getSequenceNumber()); + } + + private void appendShardMetadata(String stream, String awsRegion, Shard shard) { + String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber(); + String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); + KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion); + shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber)); + shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber)); + _stringPartitionGroupMetadataIndex.add(shardMetadata); + } + @Override public List<PartitionGroupMetadata> getMetadataList() { return _stringPartitionGroupMetadataIndex; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java index 8141cd4..327e034 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -25,11 +25,11 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; - +//TODO: Implement shardId as Array public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata { String _shardId; - Checkpoint _startCheckpoint; - Checkpoint _endCheckpoint; + KinesisCheckpoint _startCheckpoint; + KinesisCheckpoint _endCheckpoint; public KinesisShardMetadata(String shardId, String streamName, String awsRegion) { super(streamName, awsRegion); @@ -43,23 +43,23 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa } @Override - public Checkpoint getStartCheckpoint() { + public KinesisCheckpoint getStartCheckpoint() { return _startCheckpoint; } @Override - public Checkpoint getEndCheckpoint() { + public KinesisCheckpoint getEndCheckpoint() { return _endCheckpoint; } @Override public void setStartCheckpoint(Checkpoint startCheckpoint) { - _startCheckpoint = startCheckpoint; + _startCheckpoint = (KinesisCheckpoint) startCheckpoint; } @Override public void setEndCheckpoint(Checkpoint endCheckpoint) { - _endCheckpoint = endCheckpoint; + _endCheckpoint = (KinesisCheckpoint) endCheckpoint; } @Override @@ -68,7 +68,7 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa } @Override - public PartitionGroupMetadata deserialize(byte[] blob) { + public KinesisShardMetadata deserialize(byte[] blob) { return null; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org