mjsax commented on code in PR #20511:
URL: https://github.com/apache/kafka/pull/20511#discussion_r2337602146
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -187,23 +187,14 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
*/
private class BackgroundEventProcessor implements
EventProcessor<BackgroundEvent> {
- private Optional<StreamsRebalanceListener> streamsRebalanceListener =
Optional.empty();
- private final Optional<StreamsRebalanceData> streamsRebalanceData;
+ private final Optional<StreamsRebalanceListenerInvoker>
streamsRebalanceListenerInvoker;
public BackgroundEventProcessor() {
- this.streamsRebalanceData = Optional.empty();
+ this.streamsRebalanceListenerInvoker = Optional.empty();
}
- public BackgroundEventProcessor(final Optional<StreamsRebalanceData>
streamsRebalanceData) {
- this.streamsRebalanceData = streamsRebalanceData;
- }
-
- private void setStreamsRebalanceListener(final
StreamsRebalanceListener streamsRebalanceListener) {
- if (streamsRebalanceData.isEmpty()) {
- throw new IllegalStateException("Background event processor
was not created to be used with Streams " +
- "rebalance protocol events");
- }
- this.streamsRebalanceListener =
Optional.of(streamsRebalanceListener);
+ public BackgroundEventProcessor(final
Optional<StreamsRebalanceListenerInvoker> streamsRebalanceListenerInvoker) {
Review Comment:
For this constructor, it seems not to make sense to pass in an `Optional`?
If we are in the KS case, and use this constructor, we must pass the
`StreamsRebalanceListenerInvoker`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -278,44 +269,28 @@ private void
processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT
private StreamsOnTasksRevokedCallbackCompletedEvent
invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId>
activeTasksToRevoke,
final CompletableFuture<Void> future) {
- final Optional<Exception> exceptionFromCallback =
streamsRebalanceListener().onTasksRevoked(activeTasksToRevoke);
+ final Optional<Exception> exceptionFromCallback =
Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksRevoked(activeTasksToRevoke));
final Optional<KafkaException> error = exceptionFromCallback.map(e
-> ConsumerUtils.maybeWrapAsKafkaException(e, "Task revocation callback throws
an error"));
Review Comment:
Would this work as expected, given that `exceptionFromCallback` is created
as ofNullable? It seems, if `e == null`, we would set `error = new
KafkaException("Task assignment callback throws an error", null)` what does not
look correct?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in
the {@link StreamsRebalanceListener}
+ * interface. When streams group task assignment changes, these methods are
invoked. This class wraps those
+ * callback calls with some logging and error handling.
+ */
+public class StreamsRebalanceListenerInvoker {
+
+ private final Logger log;
+
+ private final StreamsRebalanceData streamsRebalanceData;
+ private Optional<StreamsRebalanceListener> listener;
Review Comment:
Given that we use `StreamsRebalanceListenerInvoker` as `Optional` (what does
make sense), I am wondering if we should use `Optional` for the `listener`? We
would only use `StreamsRebalanceListenerInvoker` for the KS case, but if we are
in the KS case, we must have a `StreamsRebalanceListener`, right?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -278,44 +269,28 @@ private void
processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT
private StreamsOnTasksRevokedCallbackCompletedEvent
invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId>
activeTasksToRevoke,
final CompletableFuture<Void> future) {
- final Optional<Exception> exceptionFromCallback =
streamsRebalanceListener().onTasksRevoked(activeTasksToRevoke);
+ final Optional<Exception> exceptionFromCallback =
Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksRevoked(activeTasksToRevoke));
final Optional<KafkaException> error = exceptionFromCallback.map(e
-> ConsumerUtils.maybeWrapAsKafkaException(e, "Task revocation callback throws
an error"));
return new StreamsOnTasksRevokedCallbackCompletedEvent(future,
error);
}
private StreamsOnTasksAssignedCallbackCompletedEvent
invokeOnTasksAssignedCallback(final StreamsRebalanceData.Assignment assignment,
final CompletableFuture<Void> future) {
final Optional<KafkaException> error;
- final Optional<Exception> exceptionFromCallback =
streamsRebalanceListener().onTasksAssigned(assignment);
- if (exceptionFromCallback.isPresent()) {
- error =
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(),
"Task assignment callback throws an error"));
- } else {
- error = Optional.empty();
- streamsRebalanceData().setReconciledAssignment(assignment);
- }
+ final Optional<Exception> exceptionFromCallback =
Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksAssigned(assignment));
+ error = exceptionFromCallback.map(e ->
ConsumerUtils.maybeWrapAsKafkaException(e, "Task assignment callback throws an
error"));
Review Comment:
As above (same elsewhere).
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in
the {@link StreamsRebalanceListener}
+ * interface. When streams group task assignment changes, these methods are
invoked. This class wraps those
+ * callback calls with some logging and error handling.
+ */
+public class StreamsRebalanceListenerInvoker {
+
+ private final Logger log;
+
+ private final StreamsRebalanceData streamsRebalanceData;
+ private Optional<StreamsRebalanceListener> listener;
+
+ StreamsRebalanceListenerInvoker(LogContext logContext,
StreamsRebalanceData streamsRebalanceData) {
+ this.log = logContext.logger(getClass());
+ this.listener = Optional.empty();
+ this.streamsRebalanceData = streamsRebalanceData;
+ }
+
+ public void setRebalanceListener(StreamsRebalanceListener
streamsRebalanceListener) {
+ this.listener = Optional.ofNullable(streamsRebalanceListener);
Review Comment:
I don't think that `streamsRebalanceListener` should be allowed to be `null`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1964,7 +1957,11 @@ public void subscribe(Collection<String> topics,
ConsumerRebalanceListener liste
public void subscribe(Collection<String> topics, StreamsRebalanceListener
streamsRebalanceListener) {
subscribeInternal(topics, Optional.empty());
-
backgroundEventProcessor.setStreamsRebalanceListener(streamsRebalanceListener);
+ if (streamsRebalanceListenerInvoker.isPresent()) {
Review Comment:
Can we simplify this to `orElseThrow` ?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1532,21 +1512,34 @@ private void runRebalanceCallbacksOnClose() {
Review Comment:
Cannot comment above, but there is:
```
if (groupMetadata.get().isEmpty() || rebalanceListenerInvoker ==
null)
return;
```
I assume we only have `rebalanceListenerInvoker` set, for the plain consumer
case? So for KS case, we would `return` above? Don't we need to change this
code, too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]