This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 173b3c4097654239c92413625aaf018c406dfab2
Author: KKcorps <kharekar...@gmail.com>
AuthorDate: Mon Dec 21 14:21:55 2020 +0530

    Handle closed connections
---
 .../plugin/stream/kinesis/KinesisConnectionHandler.java      | 12 +++++++++---
 .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java  |  8 ++++++++
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index ba94b0a..3607787 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -49,9 +49,7 @@ public class KinesisConnectionHandler {
   public KinesisConnectionHandler(String stream, String awsRegion) {
     _stream = stream;
     _awsRegion = awsRegion;
-    _kinesisClient =
-        
KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
-            .build();
+    createConnection();
   }
 
   public List<Shard> getShards() {
@@ -60,9 +58,17 @@ public class KinesisConnectionHandler {
     return listShardsResponse.shards();
   }
 
+  public void createConnection(){
+    if(_kinesisClient == null) {
+      _kinesisClient = 
KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+          .build();
+    }
+  }
+
   public void close(){
     if(_kinesisClient != null) {
       _kinesisClient.close();
+      _kinesisClient = null;
     }
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 24810ba..fd48a92 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -79,6 +79,10 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Consume
 
     try {
 
+      if(_kinesisClient == null){
+        createConnection();
+      }
+
       String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
       String kinesisEndSequenceNumber = null;
@@ -176,4 +180,8 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Consume
     return getShardIteratorResponse.shardIterator();
   }
 
+  @Override
+  public void close() {
+    super.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to