This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 00db7a285a2e49e1364cd39a338bc115b50ce250 Author: KKcorps <kharekar...@gmail.com> AuthorDate: Sun Dec 20 23:49:27 2020 +0530 Refactor code --- .../pinot/plugin/stream/kinesis/KinesisConnectionHandler.java | 6 ++++++ .../org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java | 1 - 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java index c41598e..ba94b0a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -59,4 +59,10 @@ public class KinesisConnectionHandler { _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build()); return listShardsResponse.shards(); } + + public void close(){ + if(_kinesisClient != null) { + _kinesisClient.close(); + } + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index dfd6cda..24810ba 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -46,7 +46,6 @@ import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -//TODO: Handle exceptions and timeout public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { String _stream; Integer _maxRecords; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org