ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623064697
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java:
##########
@@ -263,6 +267,8 @@ public
Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor> user
final org.apache.kafka.streams.processor.assignment.TaskAssignor
assignor = Utils.newInstance(userTaskAssignorClassname,
org.apache.kafka.streams.processor.assignment.TaskAssignor.class);
log.info("Instantiated {} as the task assignor.",
userTaskAssignorClassname);
+ assignor.configure(streamsConfig.originals());
+ log.info("Configured task assignor {} with the StreamsConfig.",
userTaskAssignorClassname);
Review Comment:
I think we only need to log something once about the assignor, the first one
is good enough so I'd just remove this
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##########
@@ -45,8 +45,8 @@
public class HighAvailabilityTaskAssignor implements TaskAssignor {
private static final Logger log =
LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
- private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
- private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;
+ public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
Review Comment:
ditto here: let's use a different variable name for the different classes,
too confusing if we're mixing them up by calling them outside of this class
now. So I guess `DEFAULT_HIGH_AVAILABILITY_XXX_COST` (or maybe just
`DEFAULT_HA_XXX_COST`?)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1492,8 +1521,7 @@ public void onAssignment(final Assignment assignment,
final ConsumerGroupMetadat
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
break;
- case 6:
- validateActiveTaskEncoding(partitions, info, logPrefix);
+ case 6: validateActiveTaskEncoding(partitions, info, logPrefix);
Review Comment:
accidental formatting change?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -790,8 +813,14 @@ private UserTaskAssignmentListener
assignTasksToClients(final Cluster fullMetada
final org.apache.kafka.streams.processor.assignment.TaskAssignor
assignor = userTaskAssignor.get();
final TaskAssignment taskAssignment =
assignor.assign(applicationState);
final AssignmentError assignmentError =
validateTaskAssignment(applicationState, taskAssignment);
- processStreamsPartitionAssignment(clientMetadataMap,
taskAssignment);
- userTaskAssignmentListener = (assignment, subscription) ->
assignor.onAssignmentComputed(assignment, subscription, assignmentError);
+ processStreamsPartitionAssignment(assignor, taskAssignment,
assignmentError, clientMetadataMap, groupSubscription);
+ userTaskAssignmentListener = (assignment, subscription) -> {
+ assignor.onAssignmentComputed(assignment, subscription,
assignmentError);
+ if (assignmentError != AssignmentError.NONE) {
+ throw new StreamsException("Task assignment with " +
assignor.getClass() +
Review Comment:
ditto here: log an error before throwing and change assignor.getClass() to
assignor.getClass().getName()
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -573,14 +586,23 @@ private ApplicationState buildApplicationState(final
TopologyMetadata topologyMe
));
return new DefaultApplicationState(
- assignmentConfigs.toPublicAssignmentConfigs(),
+ publicAssignmentConfigs,
logicalTasks,
clientMetadataMap
);
}
- private static void processStreamsPartitionAssignment(final Map<UUID,
ClientMetadata> clientMetadataMap,
- final TaskAssignment
taskAssignment) {
+ private static void processStreamsPartitionAssignment(final
org.apache.kafka.streams.processor.assignment.TaskAssignor assignor,
+ final TaskAssignment
taskAssignment,
+ final
AssignmentError assignmentError,
+ final Map<UUID,
ClientMetadata> clientMetadataMap,
+ final
GroupSubscription groupSubscription) {
+ if (assignmentError == AssignmentError.UNKNOWN_PROCESS_ID ||
assignmentError == AssignmentError.UNKNOWN_TASK_ID) {
+ assignor.onAssignmentComputed(null, groupSubscription,
assignmentError);
+ throw new StreamsException("Task assignment with " +
assignor.getClass() +
Review Comment:
also we should log an error before throwing. It can say basically the same
thing as what's in the exception -- or maybe say something like "returning
empty GroupAssignment and failing due to error {}"
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -573,14 +586,23 @@ private ApplicationState buildApplicationState(final
TopologyMetadata topologyMe
));
return new DefaultApplicationState(
- assignmentConfigs.toPublicAssignmentConfigs(),
+ publicAssignmentConfigs,
logicalTasks,
clientMetadataMap
);
}
- private static void processStreamsPartitionAssignment(final Map<UUID,
ClientMetadata> clientMetadataMap,
- final TaskAssignment
taskAssignment) {
+ private static void processStreamsPartitionAssignment(final
org.apache.kafka.streams.processor.assignment.TaskAssignor assignor,
+ final TaskAssignment
taskAssignment,
+ final
AssignmentError assignmentError,
+ final Map<UUID,
ClientMetadata> clientMetadataMap,
+ final
GroupSubscription groupSubscription) {
+ if (assignmentError == AssignmentError.UNKNOWN_PROCESS_ID ||
assignmentError == AssignmentError.UNKNOWN_TASK_ID) {
+ assignor.onAssignmentComputed(null, groupSubscription,
assignmentError);
+ throw new StreamsException("Task assignment with " +
assignor.getClass() +
Review Comment:
```suggestion
throw new StreamsException("Task assignment with " +
assignor.getClass().getName() +
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -573,14 +586,23 @@ private ApplicationState buildApplicationState(final
TopologyMetadata topologyMe
));
return new DefaultApplicationState(
- assignmentConfigs.toPublicAssignmentConfigs(),
+ publicAssignmentConfigs,
logicalTasks,
clientMetadataMap
);
}
- private static void processStreamsPartitionAssignment(final Map<UUID,
ClientMetadata> clientMetadataMap,
- final TaskAssignment
taskAssignment) {
+ private static void processStreamsPartitionAssignment(final
org.apache.kafka.streams.processor.assignment.TaskAssignor assignor,
+ final TaskAssignment
taskAssignment,
+ final
AssignmentError assignmentError,
+ final Map<UUID,
ClientMetadata> clientMetadataMap,
+ final
GroupSubscription groupSubscription) {
+ if (assignmentError == AssignmentError.UNKNOWN_PROCESS_ID ||
assignmentError == AssignmentError.UNKNOWN_TASK_ID) {
+ assignor.onAssignmentComputed(null, groupSubscription,
assignmentError);
Review Comment:
hm...I was originally thinking we should return an empty GroupAssignment
(that is, with an empty map in it) in this edge case so that people don't hit
an NPE. Pretty sure that's what it says in the javadocs I added to the callback
so we'd have to update those if we pass in null
I can see the benefit of passing in `null` as it helps brings to their
attention that something is "wrong". But I still worry about people just
NPE-ing out because of this. So I'm leaning towards passing in a `new
GroupAssignment(Collections.emptyMap())`. Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]