ableegoldman commented on code in PR #16074:
URL: https://github.com/apache/kafka/pull/16074#discussion_r1613953516
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -760,23 +762,39 @@ private boolean assignTasksToClients(final Cluster
fullMetadata,
log.debug("Assigning tasks and {} standby replicas to client nodes {}",
numStandbyReplicas(), clientStates);
- final TaskAssignor taskAssignor =
createTaskAssignor(lagComputationSuccessful);
-
- final RackAwareTaskAssignor rackAwareTaskAssignor = new
RackAwareTaskAssignor(
- fullMetadata,
- partitionsForTask,
- changelogTopics.changelogPartionsForTask(),
- tasksForTopicGroup,
- racksForProcessConsumer,
- internalTopicManager,
- assignmentConfigs,
- time
- );
- final boolean probingRebalanceNeeded =
taskAssignor.assign(clientStates,
- allTasks,
-
statefulTasks,
-
rackAwareTaskAssignor,
-
assignmentConfigs);
+ final
Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>
userTaskAssignor =
+ userTaskAssignorSupplier.get();
+ boolean probingRebalanceNeeded;
+ if (userTaskAssignor.isPresent()) {
+ final ApplicationState applicationState = buildApplicationState(
+ taskManager.topologyMetadata(),
+ clientMetadataMap,
+ topicGroups,
+ fullMetadata
+ );
+ final TaskAssignment taskAssignment =
userTaskAssignor.get().assign(applicationState);
+ processStreamsPartitionAssignment(clientMetadataMap,
taskAssignment);
+ probingRebalanceNeeded =
taskAssignment.assignment().stream().anyMatch(assignment -> {
Review Comment:
This isn't correct -- a "probing rebalance" is a very specific type of
followup rebalance related to the HA assignor where Streams will schedule it
for 10 min out. Whereas the `followupRebalanceDeadline` could be an immediate
follow rebalance or a probing rebalance or any custom deadline.
This is why I think we need to do the other end first and work backwards, ie
use the TaskAssignment to build up the individual consumer assignments. The
followup rebalance logic is a lot more "nuanced" (to put it nicely) and can't
really be simplified in this way. We should put this PR on hold until that one
is merged
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]