This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to 
refs/heads/sharded_consumer_type_support_with_kinesis by this push:
     new f9f2419  Avoid writing 'stream' and also 'stream.kinesis.topic.name'
f9f2419 is described below

commit f9f2419fef4d60a1ccd87ef58dc8dea08b411df3
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Mon Feb 1 18:35:56 2021 -0800

    Avoid writing 'stream' and also 'stream.kinesis.topic.name'
---
 .../java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java  | 6 ++++--
 .../pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java | 4 +++-
 .../test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java | 3 +--
 3 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 6e46498..0e8cc8a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.Map;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
@@ -27,7 +28,7 @@ import 
software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
  * Kinesis stream specific config
  */
 public class KinesisConfig {
-  public static final String STREAM = "stream";
+  public static final String STREAM_TYPE = "kinesis";
   public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type";
   public static final String AWS_REGION = "aws-region";
   public static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
@@ -45,7 +46,8 @@ public class KinesisConfig {
   }
 
   public String getStream() {
-    return _props.get(STREAM);
+    return _props
+        .get(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_TOPIC_NAME));
   }
 
   public String getAwsRegion() {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
index 1e832fa..324d559 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
@@ -34,7 +35,8 @@ public class KinesisConsumerIntegrationTest {
   public static void main(String[] args)
       throws IOException {
     Map<String, String> props = new HashMap<>();
-    props.put(KinesisConfig.STREAM, STREAM_NAME);
+    props.put(StreamConfigProperties
+        .constructStreamProperty(KinesisConfig.STREAM_TYPE, 
StreamConfigProperties.STREAM_TOPIC_NAME), STREAM_NAME);
     props.put(KinesisConfig.AWS_REGION, AWS_REGION);
     props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
     props.put(KinesisConfig.SHARD_ITERATOR_TYPE, 
ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java
index 28d02de..f58cf24 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java
@@ -31,7 +31,6 @@ public class TestUtils {
 
   public static StreamConfig getStreamConfig() {
     Map<String, String> props = new HashMap<>();
-    props.put(KinesisConfig.STREAM, STREAM_NAME);
     props.put(KinesisConfig.AWS_REGION, AWS_REGION);
     props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
     props.put(KinesisConfig.SHARD_ITERATOR_TYPE, 
ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
@@ -46,7 +45,7 @@ public class TestUtils {
 
   public static KinesisConfig getKinesisConfig() {
     Map<String, String> props = new HashMap<>();
-    props.put(KinesisConfig.STREAM, STREAM_NAME);
+    props.put("stream.kinesis.topic.name", STREAM_NAME);
     props.put(KinesisConfig.AWS_REGION, AWS_REGION);
     props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
     props.put(KinesisConfig.SHARD_ITERATOR_TYPE, 
ShardIteratorType.AT_SEQUENCE_NUMBER.toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to