agavra commented on code in PR #16074:
URL: https://github.com/apache/kafka/pull/16074#discussion_r1613728858
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java:
##########
@@ -253,6 +253,21 @@ public TaskAssignor taskAssignor() {
}
}
+ public org.apache.kafka.streams.processor.assignment.TaskAssignor
userTaskAssignor() {
+ final String userTaskAssignorClassname =
streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+ if (userTaskAssignorClassname == null) {
+ return null;
+ }
+ try {
+ return Utils.newInstance(userTaskAssignorClassname,
org.apache.kafka.streams.processor.assignment.TaskAssignor.class);
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ "Expected an instantiable class name for " +
StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
Review Comment:
let's add the name of the class that was passed in to aid in debugging
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -820,6 +820,9 @@ public class StreamsConfig extends AbstractConfig {
+ "optimization algorithm favors minimizing cross rack traffic or
minimize the movement of tasks in existing assignment. If set a larger value
<code>" + RackAwareTaskAssignor.class.getName() + "</code> will "
+ "optimize to maintain the existing assignment. The default value is
null which means it will use default non_overlap cost values in different
assignors.";
+ @SuppressWarnings("WeakerAccess")
+ public static final String TASK_ASSIGNOR_CLASS_CONFIG =
"task.assignor.class";
+ private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor
class or class name implementing the <@link TaskAssignor> interface. Defaults
to the <@link HighAvailabilityTaskAssignor> class.";
Review Comment:
Let's make sure to include what a value of `null` means here (which I
believe is the "real" default at this moment)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -760,23 +762,39 @@ private boolean assignTasksToClients(final Cluster
fullMetadata,
log.debug("Assigning tasks and {} standby replicas to client nodes {}",
numStandbyReplicas(), clientStates);
- final TaskAssignor taskAssignor =
createTaskAssignor(lagComputationSuccessful);
-
- final RackAwareTaskAssignor rackAwareTaskAssignor = new
RackAwareTaskAssignor(
- fullMetadata,
- partitionsForTask,
- changelogTopics.changelogPartionsForTask(),
- tasksForTopicGroup,
- racksForProcessConsumer,
- internalTopicManager,
- assignmentConfigs,
- time
- );
- final boolean probingRebalanceNeeded =
taskAssignor.assign(clientStates,
- allTasks,
-
statefulTasks,
-
rackAwareTaskAssignor,
-
assignmentConfigs);
+ final org.apache.kafka.streams.processor.assignment.TaskAssignor
userTaskAssignor =
+ createUserTaskAssignor(lagComputationSuccessful);
+ boolean probingRebalanceNeeded = false;
+ if (userTaskAssignor == null) {
Review Comment:
nit: consider making the if and else separate methods that return `boolean`
(whether probing rebalance is necessary) and just assign directly so it can be
`final`
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -793,6 +811,14 @@ private boolean assignTasksToClients(final Cluster
fullMetadata,
return probingRebalanceNeeded;
}
+ private org.apache.kafka.streams.processor.assignment.TaskAssignor
createUserTaskAssignor(final boolean lagComputationSuccessful) {
+ if (!lagComputationSuccessful) {
+ log.info("Failed to fetch end offsets for changelogs, will return
previous assignment to clients and "
+ + "trigger another rebalance to retry.");
+ }
Review Comment:
seems weird to make this part of this function? is this relevant only in the
case when we're creating a user assignor?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java:
##########
@@ -253,6 +253,21 @@ public TaskAssignor taskAssignor() {
}
}
+ public org.apache.kafka.streams.processor.assignment.TaskAssignor
userTaskAssignor() {
Review Comment:
consider returning `Optional<...>` to indicate that `null` is a semantically
valid result
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]