cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2107363103
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -446,29 +446,15 @@ private List<String> getTaskIdsAsStrings(final
KafkaStreams streams) {
private static Stream<Arguments> singleAndMultiTaskParameters() {
Review Comment:
This name does not really fit anymore. I propose to rename this method to
`topologyComplexityAndRebalanceProtocol`.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -484,7 +470,7 @@ private Properties props(final Properties extraProperties) {
streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG,
TestClientSupplier.class);
streamsConfiguration.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER,
TestConsumerWrapper.class);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
- streamsConfiguration.putAll(extraProperties);
Review Comment:
This does not seem right. On line 472 the group protocol config is passed to
`props()`, but here it is ignored.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -395,18 +379,15 @@ public void handleAssignment(final Map<TaskId,
Set<TopicPartition>> activeTasks,
// 2. for tasks that have changed active/standby status, just recycle
and skip re-creating them
// 3. otherwise, close them since they are no longer owned
final Map<TaskId, RuntimeException> failedTasks = new
LinkedHashMap<>();
- if (stateUpdater == null) {
- handleTasksWithoutStateUpdater(activeTasksToCreate,
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
- } else {
- handleTasksWithStateUpdater(
- activeTasksToCreate,
- standbyTasksToCreate,
- tasksToRecycle,
- tasksToCloseClean,
- failedTasks
- );
-
failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater());
- }
+
+ handleTasksWithStateUpdater(
Review Comment:
Could you please rename this method to `handleTasks()`. We do not need to
distinguish the cases with and without state updater.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1909,7 +1772,7 @@ public void
shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro
}
@Test
- public void
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
+ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask()
{
Review Comment:
Could you please change
```java
final long changelogOffsetOfRunningTask = 42L;
```
to
```java
final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
```
to make the case more real?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean>
streamsUncaughtExceptionHandler) {
- final boolean stateUpdaterEnabled =
InternalConfig.stateUpdaterEnabled(config.originals());
Review Comment:
There is still a system test that uses the config. It is
[`streams_upgrade_test.test_upgrade_downgrade_state_updater()`](https://github.com/apache/kafka/blob/1ded681684e771b16aa98ae751f39b9816345a83/tests/kafkatest/tests/streams/streams_upgrade_test.py#L178).
There is a comment that says:
```
Once same-thread state restoration is removed from the code, this test
should use different versions of the code.
```
I guess it means to only use a version before `3.8` (e.g. `LATEST_3_7`) for
the `from_version` and `DEV_VERSION` for the `to_version`. You need to choose a
version before `3.8` because before `3.8` the state updater was not enabled by
default.
@lucasbru did I correctly interpret your comment?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1940,57 +1803,6 @@ public void
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat
);
}
- @Test
- public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws
Exception {
Review Comment:
Could you please replace this test with the following:
```java
@Test
public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() {
final StreamTask restoringStatefulTask = statefulTask(taskId01,
taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final long changelogOffsetOfRestoringStandbyTask = 84L;
when(restoringStatefulTask.changelogOffsets())
.thenReturn(mkMap(
mkEntry(t1p1changelog,
changelogOffsetOfRestoringStandbyTask),
mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
assertThat(
taskManager.taskOffsetSums(),
is(mkMap(
mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask)
))
);
}
```
where
```java
private final TopicPartition t1p1changelog2 = new
TopicPartition("changelog2", 1);
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1851,29 +1736,7 @@ public void
shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater()
}
@Test
- public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws
Exception {
Review Comment:
Could you please add the following test as a replacement for the this test:
```java
@Test
public void shouldComputeOffsetSumForRunningStatefulTask() {
final StreamTask runningStatefulTask = statefulTask(taskId00,
taskId00ChangelogPartitions)
.inState(State.RUNNING).build();
final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
when(runningStatefulTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p0changelog,
changelogOffsetOfRunningTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
assertThat(
taskManager.taskOffsetSums(),
is(mkMap(mkEntry(taskId00, changelogOffsetOfRunningTask)))
);
}
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2097,105 +1909,9 @@ public void
shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
}
- @Test
- public void
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
Review Comment:
I believe you cannot just delete all the tests that contain
`tryToCompleteRetoration()`. You need to rewrite them. Let me know if you need
help.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -870,12 +806,8 @@ private Map<TaskId, RuntimeException>
closeAndRecycleTasks(final Map<Task, Set<T
try {
if (oldTask.isActive()) {
final StandbyTask standbyTask =
convertActiveToStandby((StreamTask) oldTask, inputPartitions);
- if (stateUpdater != null) {
- tasks.removeTask(oldTask);
-
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
- } else {
- tasks.replaceActiveWithStandby(standbyTask);
Review Comment:
This
[method](https://github.com/apache/kafka/blob/0035ac06d33fbd427605cd107e3a1da1ff2061ce/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java#L58)
from the `TaskRegistry` interface is not used anymore. Could you please remove
it and its implementations?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1320,68 +1297,14 @@ public void
shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl
}
@ParameterizedTest
- @MethodSource("data")
- public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws
InterruptedException {
- // The state updater is disabled for this test because this test
relies on the fact the mainConsumer.resume()
- // is not called. This is not true when the state updater is enabled
which leads to
- // java.lang.IllegalStateException: No current assignment for
partition topic1-2.
- // Since this tests verifies an aspect that is independent from the
state updater, it is OK to disable
- // the state updater and leave the rewriting of the test to later,
when the code path for disabled state updater
- // is removed.
Review Comment:
I believe, you need to rewrite this test as the comment says. Let me know,
if you need some help with that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]