This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9a4bb60bcc4cdd5125b1516cc3e1596243f91075 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Tue Jan 9 14:32:33 2024 +0100 CAMEL-20297 camel-aws2-kinesis: do not swallow interrupted exceptions --- .../component/aws2/kinesis/Kinesis2Consumer.java | 27 +++++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index f913870826e..93209375dcd 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -82,7 +82,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R .getAsyncClient(getEndpoint()) .describeStream(request) .get(); - } catch (ExecutionException | InterruptedException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return 0; + } catch (ExecutionException e) { throw new RuntimeException(e); } } else { @@ -123,10 +126,13 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R final Shard shard, final KinesisConnection kinesisConnection, AtomicInteger processedExchangeCount) { - String shardIterator; + String shardIterator = null; try { shardIterator = getShardIterator(shard, kinesisConnection); - } catch (ExecutionException | InterruptedException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } @@ -151,7 +157,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R .getAsyncClient(getEndpoint()) .getRecords(req) .get(); - } catch (ExecutionException | InterruptedException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } } else { @@ -229,7 +238,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R .getAsyncClient(getEndpoint()) .getShardIterator(request.build()) .get(); - } catch (ExecutionException | InterruptedException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } } else { @@ -348,7 +360,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R .listShards(request) .get() .shards(); - } catch (ExecutionException | InterruptedException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } } else {