lianetm commented on code in PR #16673:
URL: https://github.com/apache/kafka/pull/16673#discussion_r1721809134
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1443,12 +1442,10 @@ public void assign(Collection<TopicPartition>
partitions) {
// be no following rebalance.
//
// See the ApplicationEventProcessor.process() method that handles
this event for more detail.
- applicationEventHandler.add(new
AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
-
- log.info("Assigned to partition(s): {}",
partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
-
- if (subscriptions.assignFromUser(new HashSet<>(partitions)))
- applicationEventHandler.add(new
NewTopicsMetadataUpdateRequestEvent());
+ Timer timer = time.timer(defaultApiTimeoutMs);
+ AssignmentChangeEvent assignmentChangeEvent = new
AssignmentChangeEvent(timer.currentTimeMs(), calculateDeadlineMs(timer),
partitions);
+ applicationEventHandler.addAndGet(assignmentChangeEvent);
+ log.info("Assigned new partitions");
Review Comment:
nit: should we remove this log line? We already have a very similar one that
will show "Assigned to partitions...." when the actual change happens
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -118,13 +118,12 @@ private static Stream<Arguments> applicationEvents() {
final long currentTimeMs = 12345;
return Stream.of(
Arguments.of(new PollEvent(100)),
- Arguments.of(new NewTopicsMetadataUpdateRequestEvent()),
Arguments.of(new AsyncCommitEvent(new HashMap<>())),
Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
Arguments.of(new ResetPositionsEvent(500)),
Arguments.of(new ValidatePositionsEvent(500)),
Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
- Arguments.of(new AssignmentChangeEvent(offset,
currentTimeMs)));
+ Arguments.of(new AssignmentChangeEvent(12345, 12345,
Collections.emptyList())));
Review Comment:
note that by adding this we're only testing that the call to
`process(AppEvent)` ends up triggering the `process(AssignmentChangeEvent)`,
nothing more. So should we add a test for the `process(AssignmentChangeEvent)`?
We should ensure that it takes the actions we expect (commit, assign, request
metadata update), using a `processor` instance, not a mock.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -2182,6 +2207,16 @@ private void
completeUnsubscribeApplicationEventSuccessfully() {
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
}
+ private void completeAssignmentChangeEventSuccessfully() {
+ doAnswer(invocation -> {
+ AssignmentChangeEvent event = invocation.getArgument(0);
+ // In AsyncKafkaConsumer, the subscription is updated in the
background thread.
Review Comment:
nit: this inline comment doesn't seem to add much imo, the func name and
body seem clear enough to me
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -118,13 +118,12 @@ private static Stream<Arguments> applicationEvents() {
final long currentTimeMs = 12345;
Review Comment:
this is not used anymore so let's just remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]