ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1625023687
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final
TopologyMetadata topologyMe
+ "tasks for source topics vs
changelog topics.");
}
- final Set<TaskId> logicalTaskIds =
unmodifiableSet(sourcePartitionsForTask.keySet());
- final Set<DefaultTaskTopicPartition> allTopicPartitions = new
HashSet<>();
+ final Set<DefaultTaskTopicPartition> topicsRequiringRackInfo = new
HashSet<>();
+ final AtomicBoolean rackInformationFetched = new AtomicBoolean(false);
+ final Runnable fetchRackInformation = () -> {
+ if (!rackInformationFetched.get()) {
+ RackUtils.annotateTopicPartitionsWithRackInfo(cluster,
Review Comment:
very small nit (tack onto any followup PR): weird line break, either keep
everything on one line or move the `cluster` variable to the 2nd line with the
other params
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final
TopologyMetadata topologyMe
+ "tasks for source topics vs
changelog topics.");
}
- final Set<TaskId> logicalTaskIds =
unmodifiableSet(sourcePartitionsForTask.keySet());
- final Set<DefaultTaskTopicPartition> allTopicPartitions = new
HashSet<>();
+ final Set<DefaultTaskTopicPartition> topicsRequiringRackInfo = new
HashSet<>();
+ final AtomicBoolean rackInformationFetched = new AtomicBoolean(false);
+ final Runnable fetchRackInformation = () -> {
+ if (!rackInformationFetched.get()) {
+ RackUtils.annotateTopicPartitionsWithRackInfo(cluster,
+ internalTopicManager, topicsRequiringRackInfo);
+ rackInformationFetched.set(true);
+ }
+ };
+
final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsForTask =
new HashMap<>();
+ final Set<TaskId> logicalTaskIds =
unmodifiableSet(sourcePartitionsForTask.keySet());
logicalTaskIds.forEach(taskId -> {
final Set<TaskTopicPartition> topicPartitions = new HashSet<>();
for (final TopicPartition topicPartition :
sourcePartitionsForTask.get(taskId)) {
final boolean isSource = true;
final boolean isChangelog =
changelogPartitionsForTask.get(taskId).contains(topicPartition);
final DefaultTaskTopicPartition racklessTopicPartition = new
DefaultTaskTopicPartition(
- topicPartition, isSource, isChangelog, null);
- allTopicPartitions.add(racklessTopicPartition);
+ topicPartition, isSource, isChangelog,
fetchRackInformation);
+ topicsRequiringRackInfo.add(racklessTopicPartition);
topicPartitions.add(racklessTopicPartition);
}
for (final TopicPartition topicPartition :
changelogPartitionsForTask.get(taskId)) {
final boolean isSource =
sourcePartitionsForTask.get(taskId).contains(topicPartition);
final boolean isChangelog = true;
final DefaultTaskTopicPartition racklessTopicPartition = new
DefaultTaskTopicPartition(
- topicPartition, isSource, isChangelog, null);
- allTopicPartitions.add(racklessTopicPartition);
+ topicPartition, isSource, isChangelog,
fetchRackInformation);
+ if (publicAssignmentConfigs.numStandbyReplicas() > 0) {
Review Comment:
Note that active tasks will also read from changelog topics (though only
during the restore phase), so we should be adding changelogs to the
`topicsRequiringRackInfo` set even if there are no standbys configured
Again you can tack this onto PR #17 or whatever PR is next
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -573,14 +586,24 @@ private ApplicationState buildApplicationState(final
TopologyMetadata topologyMe
));
return new DefaultApplicationState(
- assignmentConfigs.toPublicAssignmentConfigs(),
+ publicAssignmentConfigs,
logicalTasks,
clientMetadataMap
);
}
- private static void processStreamsPartitionAssignment(final Map<UUID,
ClientMetadata> clientMetadataMap,
- final TaskAssignment
taskAssignment) {
+ private void processStreamsPartitionAssignment(final
org.apache.kafka.streams.processor.assignment.TaskAssignor assignor,
+ final TaskAssignment
taskAssignment,
+ final AssignmentError
assignmentError,
+ final Map<UUID,
ClientMetadata> clientMetadataMap,
+ final GroupSubscription
groupSubscription) {
+ if (assignmentError == AssignmentError.UNKNOWN_PROCESS_ID ||
assignmentError == AssignmentError.UNKNOWN_TASK_ID) {
+ assignor.onAssignmentComputed(new
GroupAssignment(Collections.emptyMap()), groupSubscription, assignmentError);
+ log.error("Task assignment returning empty GroupAssignment and
failing due to error {}", assignmentError);
Review Comment:
Technically speaking we're not returning any GroupAssignment at all for the
"task assignment" ie the rebalance, since we throw this exception instead of
returning anything from #assign. A subtle difference in wording but I think
it's worth clarifying to avoid confusion here
```suggestion
log.error("Rebalance failed due to task assignor returning
assignment with error {}, assignor callback will receive empty GroupAssignment
due to this error", assignmentError);
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -790,10 +814,17 @@ private UserTaskAssignmentListener
assignTasksToClients(final Cluster fullMetada
final org.apache.kafka.streams.processor.assignment.TaskAssignor
assignor = userTaskAssignor.get();
final TaskAssignment taskAssignment =
assignor.assign(applicationState);
final AssignmentError assignmentError =
validateTaskAssignment(applicationState, taskAssignment);
- processStreamsPartitionAssignment(clientMetadataMap,
taskAssignment);
- userTaskAssignmentListener = (assignment, subscription) ->
assignor.onAssignmentComputed(assignment, subscription, assignmentError);
+ processStreamsPartitionAssignment(assignor, taskAssignment,
assignmentError, clientMetadataMap, groupSubscription);
+ customTaskAssignmentListener = (assignment, subscription) -> {
+ assignor.onAssignmentComputed(assignment, subscription,
assignmentError);
+ if (assignmentError != AssignmentError.NONE) {
+ log.error("Task assignment returning empty GroupAssignment
and failing due to error {}", assignmentError);
Review Comment:
The "returning empty GroupAssignment" part of the log is just talking about
what we're passing in to the #onAssignmentComputed callback, which in this case
is not an empty GroupAssignment.
```suggestion
log.error("Rebalance failed due to task assignor
returning assignment with error {}", assignmentError);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]