This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push: new f9f2419 Avoid writing 'stream' and also 'stream.kinesis.topic.name' f9f2419 is described below commit f9f2419fef4d60a1ccd87ef58dc8dea08b411df3 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Mon Feb 1 18:35:56 2021 -0800 Avoid writing 'stream' and also 'stream.kinesis.topic.name' --- .../java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java | 6 ++++-- .../pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java | 4 +++- .../test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java | 3 +-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java index 6e46498..0e8cc8a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java @@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.Map; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; @@ -27,7 +28,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; * Kinesis stream specific config */ public class KinesisConfig { - public static final String STREAM = "stream"; + public static final String STREAM_TYPE = "kinesis"; public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type"; public static final String AWS_REGION = "aws-region"; public static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch"; @@ -45,7 +46,8 @@ public class KinesisConfig { } public String getStream() { - return _props.get(STREAM); + return _props + .get(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME)); } public String getAwsRegion() { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java index 1e832fa..324d559 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.pinot.spi.stream.StreamConfigProperties; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; @@ -34,7 +35,8 @@ public class KinesisConsumerIntegrationTest { public static void main(String[] args) throws IOException { Map<String, String> props = new HashMap<>(); - props.put(KinesisConfig.STREAM, STREAM_NAME); + props.put(StreamConfigProperties + .constructStreamProperty(KinesisConfig.STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME), STREAM_NAME); props.put(KinesisConfig.AWS_REGION, AWS_REGION); props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10"); props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString()); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java index 28d02de..f58cf24 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java @@ -31,7 +31,6 @@ public class TestUtils { public static StreamConfig getStreamConfig() { Map<String, String> props = new HashMap<>(); - props.put(KinesisConfig.STREAM, STREAM_NAME); props.put(KinesisConfig.AWS_REGION, AWS_REGION); props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10"); props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString()); @@ -46,7 +45,7 @@ public class TestUtils { public static KinesisConfig getKinesisConfig() { Map<String, String> props = new HashMap<>(); - props.put(KinesisConfig.STREAM, STREAM_NAME); + props.put("stream.kinesis.topic.name", STREAM_NAME); props.put(KinesisConfig.AWS_REGION, AWS_REGION); props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10"); props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org