Muralidhar Basani created KAFKA-20403:
-----------------------------------------
Summary: Streams: InterruptedException swallowed without restoring
thread interrupt flag in multiple classes
Key: KAFKA-20403
URL: https://issues.apache.org/jira/browse/KAFKA-20403
Project: Kafka
Issue Type: Improvement
Components: streams
Reporter: Muralidhar Basani
Assignee: Muralidhar Basani
In several places in the streams module code where we catch
InterruptedException but don't call Thread.currentThread().interrupt() to
restore the interrupt flag. This goes against Java's interrupt contract — when
you catch InterruptedException, you should either re-throw it or restore the
flag so callers up the stack know the thread was interrupted.
We already do this correctly in other places like TaskManager.java:719,
InternalTopicManager.java:558, and GlobalStreamThread.java:461. Probably this
was missed, however the impact is low in not fixing it. It is more of code
correctness and consistency with the rest of the codebase. As there are state
flags and condition signals in shutdown calls which take care of these
interruptions, it is of low prio.
But KafkaStreamsNamedTopologyWrapper has two ex.printStackTrace() calls (lines
308 and 333) that should be replaced with proper logging. That class is
deprecated and planned for removal, but we could clean it up.
Affected files:
-
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
(lines 880, 958)
-
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
(lines 120, 269)
-
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
(line 149)
-
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
(line 232)
-
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
(lines 308, 333)
-
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java
(line 56)
-
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
(line 1788)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)