cadonna commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1512305606
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2016,13 +2015,14 @@ public void
shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00,
taskId01, taskId02)));
handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
- reset(consumer);
- expectConsumerAssignmentPaused(consumer);
- replay(consumer);
taskManager.handleRebalanceComplete();
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00,
taskId01)));
verify(stateDirectory);
+
+ final Set<TopicPartition> assignment = singleton(new
TopicPartition("assignment", 0));
Review Comment:
I see `singleton(new TopicPartition("assignment", 0))` added around 45 times
in this PR. I assume this is because `handleAssignment()` uses it internally. I
propose to define a field final static variable `assignment` or similar that
can be reused. Alternatively, the assignment can also be passed to
`handleAssignment()`. Maybe to limit the changes in this PR, you should go for
the former option of the field variable.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2210,6 +2210,9 @@ public void shouldComputeOffsetSumForStandbyTask() throws
Exception {
restoringTask.setChangelogOffsets(changelogOffsets);
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+ final Set<TopicPartition> assignment = singleton(new
TopicPartition("assignment", 0));
+ Mockito.verify(mockitoConsumer).resume(assignment);
Review Comment:
With EasyMock you need to call `verify()` to verify a call. I do not see a
call to `verify()` in the old code. That tells me that verifying the call to
`consumer.resume()` was not relevant in the old code of this test.
Additionally, the goal of this test, is to verify the result of
`taskManager.getTaskOffsetSums()`. For that you need some tasks assigned. So
part of the setup of this test is to install an assignment so that you get
tasks assigned. With `Mockito.verify(mockitoConsumer).resume(assignment)` you
would test rather an unimportant part of the setup of this test.
To sum up, I have the impression that you verify more in the migration that
it was verified in the old code and you added a rather unimportant verification.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2332,19 +2335,12 @@ public void
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
task00.setCommittableOffsetsAndMetadata(offsets);
// first `handleAssignment`
- expectRestoreToBeCompleted(consumer);
- when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
- expectLastCall();
-
- // `handleRevocation`
- consumer.commitSync(offsets);
- expectLastCall();
+ final Set<TopicPartition> assignment = singleton(new
TopicPartition("assignment", 0));
+ when(mockitoConsumer.assignment()).thenReturn(assignment);
- // second `handleAssignment`
- consumer.commitSync(offsets);
- expectLastCall();
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
- replay(consumer);
+ taskManager.setMainConsumer(mockitoConsumer);
Review Comment:
Fair enough! Waiting for your follow-up PR then.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]