This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 173b3c4097654239c92413625aaf018c406dfab2 Author: KKcorps <kharekar...@gmail.com> AuthorDate: Mon Dec 21 14:21:55 2020 +0530 Handle closed connections --- .../plugin/stream/kinesis/KinesisConnectionHandler.java | 12 +++++++++--- .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java | 8 ++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java index ba94b0a..3607787 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -49,9 +49,7 @@ public class KinesisConnectionHandler { public KinesisConnectionHandler(String stream, String awsRegion) { _stream = stream; _awsRegion = awsRegion; - _kinesisClient = - KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()) - .build(); + createConnection(); } public List<Shard> getShards() { @@ -60,9 +58,17 @@ public class KinesisConnectionHandler { return listShardsResponse.shards(); } + public void createConnection(){ + if(_kinesisClient == null) { + _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()) + .build(); + } + } + public void close(){ if(_kinesisClient != null) { _kinesisClient.close(); + _kinesisClient = null; } } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 24810ba..fd48a92 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -79,6 +79,10 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume try { + if(_kinesisClient == null){ + createConnection(); + } + String shardIterator = getShardIterator(kinesisStartCheckpoint); String kinesisEndSequenceNumber = null; @@ -176,4 +180,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume return getShardIteratorResponse.shardIterator(); } + @Override + public void close() { + super.close(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org