ableegoldman commented on code in PR #16123:
URL: https://github.com/apache/kafka/pull/16123#discussion_r1619409187
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -720,7 +727,7 @@ private void checkAllPartitions(final Set<String>
allSourceTopics,
* Assigns a set of tasks to each client (Streams instance) using the
configured task assignor, and also
* populate the stateful tasks that have been assigned to the clients
*/
- private void assignTasksToClients(final Cluster fullMetadata,
+ private UserTaskAssignmentListener assignTasksToClients(final Cluster
fullMetadata,
Review Comment:
nit: fix formatting (indentation of the lower parameters)
Obviously you can address this in PR 9, no need to rerun the build on this PR
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster
fullMetadata,
final
Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>
userTaskAssignor =
userTaskAssignorSupplier.get();
+ UserTaskAssignmentListener userTaskAssignmentListener =
(GroupAssignment assignment, GroupSubscription subscription) -> { };
if (userTaskAssignor.isPresent()) {
final ApplicationState applicationState = buildApplicationState(
taskManager.topologyMetadata(),
clientMetadataMap,
topicGroups,
fullMetadata
);
- final TaskAssignment taskAssignment =
userTaskAssignor.get().assign(applicationState);
+ final org.apache.kafka.streams.processor.assignment.TaskAssignor
assignor = userTaskAssignor.get();
+ final TaskAssignment taskAssignment =
assignor.assign(applicationState);
processStreamsPartitionAssignment(clientMetadataMap,
taskAssignment);
+ final AssignmentError assignmentError =
validateTaskAssignment(applicationState, taskAssignment);
+ userTaskAssignmentListener = (GroupAssignment assignment,
GroupSubscription subscription) -> {
+ assignor.onAssignmentComputed(assignment, subscription,
assignmentError);
+ };
Review Comment:
nit: Streams coding style tip, use simplified lambdas, ie
```suggestion
userTaskAssignmentListener = (assignment, subscription) ->
assignor.onAssignmentComputed(assignment, subscription, assignmentError);
```
(I'm actually surprised checkstyle isn't complaining about this. Your IDE is
probably at least telling you to simplify it I would guess)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster
fullMetadata,
final
Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>
userTaskAssignor =
userTaskAssignorSupplier.get();
+ UserTaskAssignmentListener userTaskAssignmentListener =
(GroupAssignment assignment, GroupSubscription subscription) -> { };
Review Comment:
nit (address in PR 9): Streams coding style tip, prefer final variables
whenever possible. So this would be like:
```
final UserTaskAssignmentListener listener;
if (userTaskAssignor.isPresent() {
// do stuff
listener = ...;
else {
// do other stuff
listener = (a, s) -> { };
}
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultApplicationState.java:
##########
@@ -32,22 +33,29 @@
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
-public class ApplicationStateImpl implements ApplicationState {
+public class DefaultApplicationState implements ApplicationState {
private final AssignmentConfigs assignmentConfigs;
- private final Set<TaskInfo> tasks;
+ private final Map<TaskId, TaskInfo> tasks;
private final Map<UUID, ClientMetadata> clientStates;
- public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs,
- final Set<TaskInfo> tasks,
- final Map<UUID, ClientMetadata> clientStates) {
+ private final Map<Boolean, Map<ProcessId, KafkaStreamsState>>
cachedKafkaStreamStates;
+
+ public DefaultApplicationState(final AssignmentConfigs assignmentConfigs,
+ final Set<TaskInfo> tasks,
+ final Map<UUID, ClientMetadata>
clientStates) {
this.assignmentConfigs = assignmentConfigs;
- this.tasks = unmodifiableSet(tasks);
+ this.tasks =
unmodifiableMap(tasks.stream().collect(Collectors.toMap(TaskInfo::id, task ->
task)));
Review Comment:
We can avoid adding yet another `.stream().collect() ` by building up the
map with the TaskIds as keys when we're building the `Set<TaskInfo> tasks`
parameter in the caller of this constructor.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultApplicationState.java:
##########
@@ -32,22 +33,29 @@
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
-public class ApplicationStateImpl implements ApplicationState {
+public class DefaultApplicationState implements ApplicationState {
private final AssignmentConfigs assignmentConfigs;
- private final Set<TaskInfo> tasks;
+ private final Map<TaskId, TaskInfo> tasks;
private final Map<UUID, ClientMetadata> clientStates;
- public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs,
- final Set<TaskInfo> tasks,
- final Map<UUID, ClientMetadata> clientStates) {
+ private final Map<Boolean, Map<ProcessId, KafkaStreamsState>>
cachedKafkaStreamStates;
Review Comment:
nice, thanks for knocking this out as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]