This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 72a77c61226ce5cf3abd781916f7797a3fa5a230 Author: KKcorps <kharekar...@gmail.com> AuthorDate: Thu Dec 10 19:08:41 2020 +0530 Add initial implementation of Kinesis consumer --- .../pinot-stream-ingestion/pinot-kinesis/pom.xml | 39 ++++++++++++++++++ .../plugin/stream/kinesis/KinesisCheckpoint.java | 28 +++++++++++++ .../stream/kinesis/KinesisConnectionHandler.java | 25 ++++++++++++ .../plugin/stream/kinesis/KinesisConsumer.java | 40 ++++++++++++++++++ .../plugin/stream/kinesis/KinesisFetchResult.java | 25 ++++++++++++ .../stream/kinesis/KinesisShardMetadata.java | 47 ++++++++++++++++++++++ pinot-plugins/pinot-stream-ingestion/pom.xml | 1 + 7 files changed, 205 insertions(+) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml new file mode 100644 index 0000000..97e5eef --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>pinot-stream-ingestion</artifactId> + <groupId>org.apache.pinot</groupId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>pinot-kinesis</artifactId> + + <properties> + <pinot.root>${basedir}/../../..</pinot.root> + <phase.prop>package</phase.prop> + <aws.version>2.15.42</aws.version> + </properties> + + <dependencies> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>kinesis</artifactId> + <version>${aws.version}</version> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-json</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-spi</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java new file mode 100644 index 0000000..a330e78 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -0,0 +1,28 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import org.apache.pinot.spi.stream.v2.Checkpoint; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; + + +public class KinesisCheckpoint implements Checkpoint { + String _shardIterator; + + public KinesisCheckpoint(String shardIterator){ + _shardIterator = shardIterator; + } + + public String getShardIterator() { + return _shardIterator; + } + + @Override + public byte[] serialize() { + return _shardIterator.getBytes(); + } + + @Override + public Checkpoint deserialize(byte[] blob) { + return new KinesisCheckpoint(new String(blob)); + } + +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java new file mode 100644 index 0000000..7ea24c0 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -0,0 +1,25 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.v2.ConsumerV2; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + + +public class KinesisConnectionHandler { + String _awsRegion = ""; + KinesisClient _kinesisClient; + + public KinesisConnectionHandler(){ + + } + + public KinesisConnectionHandler(String awsRegion){ + _awsRegion = awsRegion; + _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build(); + } + +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java new file mode 100644 index 0000000..251d831 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -0,0 +1,40 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import java.util.Collections; +import org.apache.pinot.spi.stream.v2.Checkpoint; +import org.apache.pinot.spi.stream.v2.ConsumerV2; +import org.apache.pinot.spi.stream.v2.FetchResult; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.Record; + + +public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { + + //TODO: Fetch AWS region from Stream Config. + public KinesisConsumer(String awsRegion) { + super(awsRegion); + } + + @Override + public FetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { + KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; + KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end; + + String kinesisShardIteratorStart = kinesisStartCheckpoint.getShardIterator(); + + GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build(); + GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); + + String kinesisNextShardIterator = getRecordsResponse.nextShardIterator(); + + if(!getRecordsResponse.hasRecords()){ + return new KinesisFetchResult(kinesisNextShardIterator, Collections.emptyList()); + } + + KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisNextShardIterator, + getRecordsResponse.records()); + + return kinesisFetchResult; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java new file mode 100644 index 0000000..5ef4e30 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java @@ -0,0 +1,25 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import java.util.List; +import org.apache.pinot.spi.stream.v2.Checkpoint; +import org.apache.pinot.spi.stream.v2.FetchResult; +import software.amazon.awssdk.services.kinesis.model.Record; + + +public class KinesisFetchResult implements FetchResult { + private String _nextShardIterator; + + public KinesisFetchResult(String nextShardIterator, List<Record> recordList){ + _nextShardIterator = nextShardIterator; + } + + @Override + public Checkpoint getLastCheckpoint() { + return new KinesisCheckpoint(_nextShardIterator); + } + + @Override + public byte[] getMessages() { + return new byte[0]; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java new file mode 100644 index 0000000..07ede73 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -0,0 +1,47 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import org.apache.pinot.spi.stream.v2.Checkpoint; +import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; + + +public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata { + Checkpoint _startCheckpoint; + Checkpoint _endCheckpoint; + + public KinesisShardMetadata(String shardId, String streamName) { + GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build()); + _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator()); + } + + @Override + public Checkpoint getStartCheckpoint() { + return _startCheckpoint; + } + + @Override + public Checkpoint getEndCheckpoint() { + return _endCheckpoint; + } + + @Override + public void setStartCheckpoint(Checkpoint startCheckpoint) { + _startCheckpoint = startCheckpoint; + } + + @Override + public void setEndCheckpoint(Checkpoint endCheckpoint) { + _endCheckpoint = endCheckpoint; + } + + @Override + public byte[] serialize() { + return new byte[0]; + } + + @Override + public PartitionGroupMetadata deserialize(byte[] blob) { + return null; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pom.xml b/pinot-plugins/pinot-stream-ingestion/pom.xml index 3a51626..e7b9a46 100644 --- a/pinot-plugins/pinot-stream-ingestion/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pom.xml @@ -42,6 +42,7 @@ <module>pinot-kafka-base</module> <module>pinot-kafka-0.9</module> <module>pinot-kafka-2.0</module> + <module>pinot-kinesis</module> </modules> </project> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org