[
https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865569#comment-17865569
]
Ao Li commented on KAFKA-17112:
-------------------------------
[~cadonna] Aren't stateUpdater and processingThread expected to be started in
some tests since the parameterized tests control them.
{code}
@Parameter(0)
public boolean stateUpdaterEnabled = true;
@Parameter(1)
public boolean processingThreadsEnabled = true;
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{false, false}, {true, false}, {true, true}
});
}
{code}
and here
{code}
private static StateUpdater maybeCreateAndStartStateUpdater(final boolean
stateUpdaterEnabled,
final
StreamsMetricsImpl streamsMetrics,
final
StreamsConfig streamsConfig,
final
Consumer<byte[], byte[]> restoreConsumer,
final
ChangelogReader changelogReader,
final
TopologyMetadata topologyMetadata,
final Time time,
final String
clientId,
final int
threadIdx) {
if (stateUpdaterEnabled) {
final String name = clientId + "-StateUpdater-" + threadIdx;
final StateUpdater stateUpdater = new DefaultStateUpdater(
name,
streamsMetrics.metricsRegistry(),
streamsConfig,
restoreConsumer,
changelogReader,
topologyMetadata,
time
);
stateUpdater.start();
return stateUpdater;
} else {
return null;
}
}
{code}
> StreamThread shutdown calls completeShutdown only in CREATED state
> ------------------------------------------------------------------
>
> Key: KAFKA-17112
> URL: https://issues.apache.org/jira/browse/KAFKA-17112
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
> Affects Versions: 3.9.0
> Reporter: Ao Li
> Priority: Minor
>
> While running tests in `StreamThreadTest.java` in kafka/streams, I noticed
> the test left many lingering threads. Though the class runs `shutdown` after
> each test, the shutdown only executes `completeShutdown` if the StreamThread
> is in CREATED state. See
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231]
> and
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435]
>
> For example, you may run test
> org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending
> with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls
> `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus,
> `completeShutdown` is not called. The test creates three lingering threads: 2
> `StateUpdater` and 1 `TaskExecutor`
>
> This means that calls to `thread.shutdown` has no effect in
> `StreamThreadTest.java`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)