lianetm commented on code in PR #16673:
URL: https://github.com/apache/kafka/pull/16673#discussion_r1707299652
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java:
##########
@@ -16,33 +16,31 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
-import java.util.Collections;
-import java.util.Map;
+import java.util.Collection;
-public class AssignmentChangeEvent extends ApplicationEvent {
+public class AssignmentChangeEvent extends CompletableApplicationEvent<Void> {
- private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final long currentTimeMs;
+ private final Collection<TopicPartition> partitions;
- public AssignmentChangeEvent(final Map<TopicPartition, OffsetAndMetadata>
offsets, final long currentTimeMs) {
- super(Type.ASSIGNMENT_CHANGE);
- this.offsets = Collections.unmodifiableMap(offsets);
+ public AssignmentChangeEvent(final long currentTimeMs, final long
remainingTImeMs, final Collection<TopicPartition> partitions) {
Review Comment:
instead of `remainingTimeMs`, should we pass `deadlineMs` as it's done for
the other CompletableEvents? just for consistency
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java:
##########
@@ -16,33 +16,31 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
-import java.util.Collections;
-import java.util.Map;
+import java.util.Collection;
-public class AssignmentChangeEvent extends ApplicationEvent {
+public class AssignmentChangeEvent extends CompletableApplicationEvent<Void> {
- private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final long currentTimeMs;
Review Comment:
I notice that this is only needed to `updateAutoCommitTimer` when processing
an `AssignmentChangeEvent`. But we already have a `Time`in the
`ApplicationEventProcessor`. So, could we simplify here, remove this
`currentTimeMs`, and use that time.milliseconds as argument on ln 204 of the
AppEventProcessor?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1443,12 +1443,15 @@ public void assign(Collection<TopicPartition>
partitions) {
// be no following rebalance.
//
// See the ApplicationEventProcessor.process() method that handles
this event for more detail.
- applicationEventHandler.add(new
AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
-
- log.info("Assigned to partition(s): {}",
partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
-
- if (subscriptions.assignFromUser(new HashSet<>(partitions)))
- applicationEventHandler.add(new
NewTopicsMetadataUpdateRequestEvent());
+ Timer timer = time.timer(Long.MAX_VALUE);
Review Comment:
Should we used the `defaultApiTimeoutMs` here instead? Just to be sure that
in the error case that the assign event does not complete, we don't block
forever.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1748,8 +1767,7 @@ public void testMultipleBackgroundErrors() {
final KafkaException expectedException2 = new KafkaException("Spam,
Spam, Spam");
final ErrorEvent errorEvent2 = new ErrorEvent(expectedException2);
backgroundEventQueue.add(errorEvent2);
- consumer.assign(singletonList(new TopicPartition("topic", 0)));
- final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.poll(Duration.ZERO));
+ final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.assign(singletonList(new TopicPartition("topic", 0))));
Review Comment:
oh I hadn't realized this undesired side effect of processing background
events on assign, but of course, I missed that. I would say we don't want this
change in behaviour.
We only need to ensure that the assign does not return until the background
event completes, so not really need to process background events, we just need
`applicationEventHandler.addAndGet(assignmentChangeEvent)` (and I would ensure
we create the event using the `defaultApiTimeout` for the deadline. Makes
sense? the we would keep the same behaviour we had on this test (assign does
not throw any background event, poll does).
Just for the record, on the unsubscribe we do need to process background
events because of the rebalanceCallbacks that run in the app thread, but that's
not the case here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]