This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9a4bb60bcc4cdd5125b1516cc3e1596243f91075
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Tue Jan 9 14:32:33 2024 +0100

    CAMEL-20297 camel-aws2-kinesis: do not swallow interrupted exceptions
---
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 27 +++++++++++++++++-----
 1 file changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index f913870826e..93209375dcd 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -82,7 +82,10 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
                             .getAsyncClient(getEndpoint())
                             .describeStream(request)
                             .get();
-                } catch (ExecutionException | InterruptedException e) {
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return 0;
+                } catch (ExecutionException e) {
                     throw new RuntimeException(e);
                 }
             } else {
@@ -123,10 +126,13 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
             final Shard shard,
             final KinesisConnection kinesisConnection,
             AtomicInteger processedExchangeCount) {
-        String shardIterator;
+        String shardIterator = null;
         try {
             shardIterator = getShardIterator(shard, kinesisConnection);
-        } catch (ExecutionException | InterruptedException e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        } catch (ExecutionException e) {
             throw new RuntimeException(e);
         }
 
@@ -151,7 +157,10 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
                         .getAsyncClient(getEndpoint())
                         .getRecords(req)
                         .get();
-            } catch (ExecutionException | InterruptedException e) {
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
                 throw new RuntimeException(e);
             }
         } else {
@@ -229,7 +238,10 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
                             .getAsyncClient(getEndpoint())
                             .getShardIterator(request.build())
                             .get();
-                } catch (ExecutionException | InterruptedException e) {
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                } catch (ExecutionException e) {
                     throw new RuntimeException(e);
                 }
             } else {
@@ -348,7 +360,10 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
                         .listShards(request)
                         .get()
                         .shards();
-            } catch (ExecutionException | InterruptedException e) {
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
                 throw new RuntimeException(e);
             }
         } else {

Reply via email to