mjsax commented on code in PR #20511:
URL: https://github.com/apache/kafka/pull/20511#discussion_r2337602146


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -187,23 +187,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      */
     private class BackgroundEventProcessor implements 
EventProcessor<BackgroundEvent> {
 
-        private Optional<StreamsRebalanceListener> streamsRebalanceListener = 
Optional.empty();
-        private final Optional<StreamsRebalanceData> streamsRebalanceData;
+        private final Optional<StreamsRebalanceListenerInvoker> 
streamsRebalanceListenerInvoker;
 
         public BackgroundEventProcessor() {
-            this.streamsRebalanceData = Optional.empty();
+            this.streamsRebalanceListenerInvoker = Optional.empty();
         }
 
-        public BackgroundEventProcessor(final Optional<StreamsRebalanceData> 
streamsRebalanceData) {
-            this.streamsRebalanceData = streamsRebalanceData;
-        }
-
-        private void setStreamsRebalanceListener(final 
StreamsRebalanceListener streamsRebalanceListener) {
-            if (streamsRebalanceData.isEmpty()) {
-                throw new IllegalStateException("Background event processor 
was not created to be used with Streams " +
-                    "rebalance protocol events");
-            }
-            this.streamsRebalanceListener = 
Optional.of(streamsRebalanceListener);
+        public BackgroundEventProcessor(final 
Optional<StreamsRebalanceListenerInvoker> streamsRebalanceListenerInvoker) {

Review Comment:
   For this constructor, it seems not to make sense to pass in an `Optional`? 
If we are in the KS case, and use this constructor, we must pass the 
`StreamsRebalanceListenerInvoker`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -278,44 +269,28 @@ private void 
processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT
 
         private StreamsOnTasksRevokedCallbackCompletedEvent 
invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId> 
activeTasksToRevoke,
                                                                                
          final CompletableFuture<Void> future) {
-            final Optional<Exception> exceptionFromCallback = 
streamsRebalanceListener().onTasksRevoked(activeTasksToRevoke);
+            final Optional<Exception> exceptionFromCallback = 
Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksRevoked(activeTasksToRevoke));
             final Optional<KafkaException> error = exceptionFromCallback.map(e 
-> ConsumerUtils.maybeWrapAsKafkaException(e, "Task revocation callback throws 
an error"));

Review Comment:
   Would this work as expected, given that `exceptionFromCallback` is created 
as ofNullable? It seems, if `e == null`, we would set `error = new 
KafkaException("Task assignment callback throws an error", null)` what does not 
look correct?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in 
the {@link StreamsRebalanceListener}
+ * interface. When streams group task assignment changes, these methods are 
invoked. This class wraps those
+ * callback calls with some logging and error handling.
+ */
+public class StreamsRebalanceListenerInvoker {
+
+    private final Logger log;
+
+    private final StreamsRebalanceData streamsRebalanceData;
+    private Optional<StreamsRebalanceListener> listener;

Review Comment:
   Given that we use `StreamsRebalanceListenerInvoker` as `Optional` (what does 
make sense), I am wondering if we should use `Optional` for the `listener`? We 
would only use `StreamsRebalanceListenerInvoker` for the KS case, but if we are 
in the KS case, we must have a `StreamsRebalanceListener`, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -278,44 +269,28 @@ private void 
processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT
 
         private StreamsOnTasksRevokedCallbackCompletedEvent 
invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId> 
activeTasksToRevoke,
                                                                                
          final CompletableFuture<Void> future) {
-            final Optional<Exception> exceptionFromCallback = 
streamsRebalanceListener().onTasksRevoked(activeTasksToRevoke);
+            final Optional<Exception> exceptionFromCallback = 
Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksRevoked(activeTasksToRevoke));
             final Optional<KafkaException> error = exceptionFromCallback.map(e 
-> ConsumerUtils.maybeWrapAsKafkaException(e, "Task revocation callback throws 
an error"));
             return new StreamsOnTasksRevokedCallbackCompletedEvent(future, 
error);
         }
 
         private StreamsOnTasksAssignedCallbackCompletedEvent 
invokeOnTasksAssignedCallback(final StreamsRebalanceData.Assignment assignment,
                                                                                
            final CompletableFuture<Void> future) {
             final Optional<KafkaException> error;
-            final Optional<Exception> exceptionFromCallback = 
streamsRebalanceListener().onTasksAssigned(assignment);
-            if (exceptionFromCallback.isPresent()) {
-                error = 
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(),
 "Task assignment callback throws an error"));
-            } else {
-                error = Optional.empty();
-                streamsRebalanceData().setReconciledAssignment(assignment);
-            }
+            final Optional<Exception> exceptionFromCallback = 
Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksAssigned(assignment));
+            error = exceptionFromCallback.map(e -> 
ConsumerUtils.maybeWrapAsKafkaException(e, "Task assignment callback throws an 
error"));

Review Comment:
   As above (same elsewhere).



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in 
the {@link StreamsRebalanceListener}
+ * interface. When streams group task assignment changes, these methods are 
invoked. This class wraps those
+ * callback calls with some logging and error handling.
+ */
+public class StreamsRebalanceListenerInvoker {
+
+    private final Logger log;
+
+    private final StreamsRebalanceData streamsRebalanceData;
+    private Optional<StreamsRebalanceListener> listener;
+
+    StreamsRebalanceListenerInvoker(LogContext logContext, 
StreamsRebalanceData streamsRebalanceData) {
+        this.log = logContext.logger(getClass());
+        this.listener = Optional.empty();
+        this.streamsRebalanceData = streamsRebalanceData;
+    }
+
+    public void setRebalanceListener(StreamsRebalanceListener 
streamsRebalanceListener) {
+        this.listener = Optional.ofNullable(streamsRebalanceListener);

Review Comment:
   I don't think that `streamsRebalanceListener` should be allowed to be `null`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1964,7 +1957,11 @@ public void subscribe(Collection<String> topics, 
ConsumerRebalanceListener liste
 
     public void subscribe(Collection<String> topics, StreamsRebalanceListener 
streamsRebalanceListener) {
         subscribeInternal(topics, Optional.empty());
-        
backgroundEventProcessor.setStreamsRebalanceListener(streamsRebalanceListener);
+        if (streamsRebalanceListenerInvoker.isPresent()) {

Review Comment:
   Can we simplify this to `orElseThrow` ?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1532,21 +1512,34 @@ private void runRebalanceCallbacksOnClose() {
 

Review Comment:
   Cannot comment above, but there is:
   ```
           if (groupMetadata.get().isEmpty() || rebalanceListenerInvoker == 
null)
               return;
   ```
   
   I assume we only have `rebalanceListenerInvoker` set, for the plain consumer 
case? So for KS case, we would `return` above? Don't we need to change this 
code, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to