ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619777550
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -244,18 +274,117 @@ public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareStandbyTas
);
LOG.info("Assignment before standby task optimization has cost {}",
initialCost);
- throw new UnsupportedOperationException("Not yet Implemented.");
+ final MoveStandbyTaskPredicate moveablePredicate =
getStandbyTaskMovePredicate(applicationState);
+ final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment,
List<TaskId>> getMovableTasks = (source, destination) -> {
+ return source.tasks().values().stream().filter(task -> task.type()
== AssignedTask.Type.STANDBY)
+ .filter(task -> {
+ return !destination.tasks().containsKey(task.id());
+ })
+ .filter(task -> {
+ final KafkaStreamsState sourceState =
kafkaStreamsStates.get(source.processId());
+ final KafkaStreamsState destinationState =
kafkaStreamsStates.get(source.processId());
+ return moveablePredicate.canMoveStandbyTask(sourceState,
destinationState, task.id(), kafkaStreamsAssignments);
+ })
+ .map(AssignedTask::id)
+ .sorted()
+ .collect(Collectors.toList());
+ };
+
+ final long startTime = System.currentTimeMillis();
+ boolean taskMoved = true;
+ int round = 0;
+ final RackAwareGraphConstructor<KafkaStreamsAssignment>
graphConstructor = RackAwareGraphConstructorFactory.create(
+
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+ while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+ taskMoved = false;
+ round++;
+ for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+ final UUID clientId1 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState1 =
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+ for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+ final UUID clientId2 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState2 =
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+ final String rack1 =
clientRacks.get(clientState1.processId().id()).get();
+ final String rack2 =
clientRacks.get(clientState2.processId().id()).get();
+ // Cross rack traffic can not be reduced if racks are the
same
+ if (rack1.equals(rack2)) {
+ continue;
+ }
+
+ final List<TaskId> movable1 =
getMovableTasks.apply(clientState1, clientState2);
+ final List<TaskId> movable2 =
getMovableTasks.apply(clientState2, clientState1);
+
+ // There's no needed to optimize if one is empty because
the optimization
+ // can only swap tasks to keep the client's load balanced
+ if (movable1.isEmpty() || movable2.isEmpty()) {
+ continue;
+ }
+
+ final List<TaskId> taskIdList =
Stream.concat(movable1.stream(),
+ movable2.stream())
Review Comment:
I noticed this is messed up in the original code too, but can you fix the
indentation/remove the line break (just here, no need to fix it in the
RackAwareTaskAssignor too)
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -244,18 +274,117 @@ public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareStandbyTas
);
LOG.info("Assignment before standby task optimization has cost {}",
initialCost);
- throw new UnsupportedOperationException("Not yet Implemented.");
+ final MoveStandbyTaskPredicate moveablePredicate =
getStandbyTaskMovePredicate(applicationState);
+ final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment,
List<TaskId>> getMovableTasks = (source, destination) -> {
+ return source.tasks().values().stream().filter(task -> task.type()
== AssignedTask.Type.STANDBY)
+ .filter(task -> {
+ return !destination.tasks().containsKey(task.id());
+ })
+ .filter(task -> {
+ final KafkaStreamsState sourceState =
kafkaStreamsStates.get(source.processId());
+ final KafkaStreamsState destinationState =
kafkaStreamsStates.get(source.processId());
+ return moveablePredicate.canMoveStandbyTask(sourceState,
destinationState, task.id(), kafkaStreamsAssignments);
+ })
+ .map(AssignedTask::id)
+ .sorted()
+ .collect(Collectors.toList());
+ };
+
+ final long startTime = System.currentTimeMillis();
+ boolean taskMoved = true;
+ int round = 0;
+ final RackAwareGraphConstructor<KafkaStreamsAssignment>
graphConstructor = RackAwareGraphConstructorFactory.create(
+
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+ while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+ taskMoved = false;
+ round++;
+ for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+ final UUID clientId1 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState1 =
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+ for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+ final UUID clientId2 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState2 =
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+ final String rack1 =
clientRacks.get(clientState1.processId().id()).get();
+ final String rack2 =
clientRacks.get(clientState2.processId().id()).get();
+ // Cross rack traffic can not be reduced if racks are the
same
+ if (rack1.equals(rack2)) {
+ continue;
+ }
+
+ final List<TaskId> movable1 =
getMovableTasks.apply(clientState1, clientState2);
+ final List<TaskId> movable2 =
getMovableTasks.apply(clientState2, clientState1);
+
+ // There's no needed to optimize if one is empty because
the optimization
+ // can only swap tasks to keep the client's load balanced
+ if (movable1.isEmpty() || movable2.isEmpty()) {
+ continue;
+ }
+
+ final List<TaskId> taskIdList =
Stream.concat(movable1.stream(),
+ movable2.stream())
+ .sorted()
+ .collect(Collectors.toList());
+
+ final List<UUID> clients = Stream.of(clientId1, clientId2)
+ .sorted().collect(
+ Collectors.toList());
+
+ final AssignmentGraph assignmentGraph = buildTaskGraph(
+ assignmentsByUuid,
+ clientRacks,
+ taskIdList,
+ clients,
+ topicPartitionsByTaskId,
+ crossRackTrafficCost,
+ nonOverlapCost,
+ false,
+ false,
+ graphConstructor
+ );
+ assignmentGraph.graph.solveMinCostFlow();
+
+ taskMoved |= graphConstructor.assignTaskFromMinCostFlow(
+ assignmentGraph.graph,
+ clientIds,
+ taskIds,
+ assignmentsByUuid,
+ assignmentGraph.taskCountByClient,
+ assignmentGraph.clientByTask,
+ (assignment, taskId) -> assignment.assignTask(new
AssignedTask(taskId, AssignedTask.Type.ACTIVE)),
+ (assignment, taskId) -> assignment.removeTask(new
AssignedTask(taskId, AssignedTask.Type.ACTIVE)),
+ (assignment, taskId) ->
assignment.tasks().containsKey(taskId)
Review Comment:
```suggestion
(assignment, taskId) ->
assignment.tasks().containsKey(taskId) && assignment.tasks().get(taskId).type()
== AssignedTask.Type.STANDBY
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -244,18 +274,117 @@ public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareStandbyTas
);
LOG.info("Assignment before standby task optimization has cost {}",
initialCost);
- throw new UnsupportedOperationException("Not yet Implemented.");
+ final MoveStandbyTaskPredicate moveablePredicate =
getStandbyTaskMovePredicate(applicationState);
+ final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment,
List<TaskId>> getMovableTasks = (source, destination) -> {
+ return source.tasks().values().stream().filter(task -> task.type()
== AssignedTask.Type.STANDBY)
+ .filter(task -> {
+ return !destination.tasks().containsKey(task.id());
+ })
+ .filter(task -> {
+ final KafkaStreamsState sourceState =
kafkaStreamsStates.get(source.processId());
+ final KafkaStreamsState destinationState =
kafkaStreamsStates.get(source.processId());
+ return moveablePredicate.canMoveStandbyTask(sourceState,
destinationState, task.id(), kafkaStreamsAssignments);
+ })
+ .map(AssignedTask::id)
+ .sorted()
+ .collect(Collectors.toList());
+ };
+
+ final long startTime = System.currentTimeMillis();
+ boolean taskMoved = true;
+ int round = 0;
+ final RackAwareGraphConstructor<KafkaStreamsAssignment>
graphConstructor = RackAwareGraphConstructorFactory.create(
+
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+ while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+ taskMoved = false;
+ round++;
+ for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+ final UUID clientId1 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState1 =
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+ for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+ final UUID clientId2 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState2 =
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+ final String rack1 =
clientRacks.get(clientState1.processId().id()).get();
+ final String rack2 =
clientRacks.get(clientState2.processId().id()).get();
+ // Cross rack traffic can not be reduced if racks are the
same
+ if (rack1.equals(rack2)) {
+ continue;
+ }
+
+ final List<TaskId> movable1 =
getMovableTasks.apply(clientState1, clientState2);
+ final List<TaskId> movable2 =
getMovableTasks.apply(clientState2, clientState1);
+
+ // There's no needed to optimize if one is empty because
the optimization
+ // can only swap tasks to keep the client's load balanced
+ if (movable1.isEmpty() || movable2.isEmpty()) {
+ continue;
+ }
+
+ final List<TaskId> taskIdList =
Stream.concat(movable1.stream(),
+ movable2.stream())
+ .sorted()
+ .collect(Collectors.toList());
+
+ final List<UUID> clients = Stream.of(clientId1, clientId2)
+ .sorted().collect(
+ Collectors.toList());
Review Comment:
Same here, remove the random line break before `Collectors.toList());`
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -244,18 +274,117 @@ public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareStandbyTas
);
LOG.info("Assignment before standby task optimization has cost {}",
initialCost);
- throw new UnsupportedOperationException("Not yet Implemented.");
+ final MoveStandbyTaskPredicate moveablePredicate =
getStandbyTaskMovePredicate(applicationState);
+ final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment,
List<TaskId>> getMovableTasks = (source, destination) -> {
+ return source.tasks().values().stream().filter(task -> task.type()
== AssignedTask.Type.STANDBY)
+ .filter(task -> {
+ return !destination.tasks().containsKey(task.id());
+ })
+ .filter(task -> {
+ final KafkaStreamsState sourceState =
kafkaStreamsStates.get(source.processId());
+ final KafkaStreamsState destinationState =
kafkaStreamsStates.get(source.processId());
+ return moveablePredicate.canMoveStandbyTask(sourceState,
destinationState, task.id(), kafkaStreamsAssignments);
+ })
+ .map(AssignedTask::id)
+ .sorted()
+ .collect(Collectors.toList());
+ };
+
+ final long startTime = System.currentTimeMillis();
+ boolean taskMoved = true;
+ int round = 0;
+ final RackAwareGraphConstructor<KafkaStreamsAssignment>
graphConstructor = RackAwareGraphConstructorFactory.create(
+
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+ while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+ taskMoved = false;
+ round++;
+ for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+ final UUID clientId1 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState1 =
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+ for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+ final UUID clientId2 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState2 =
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+ final String rack1 =
clientRacks.get(clientState1.processId().id()).get();
+ final String rack2 =
clientRacks.get(clientState2.processId().id()).get();
+ // Cross rack traffic can not be reduced if racks are the
same
+ if (rack1.equals(rack2)) {
+ continue;
+ }
+
+ final List<TaskId> movable1 =
getMovableTasks.apply(clientState1, clientState2);
+ final List<TaskId> movable2 =
getMovableTasks.apply(clientState2, clientState1);
+
+ // There's no needed to optimize if one is empty because
the optimization
+ // can only swap tasks to keep the client's load balanced
+ if (movable1.isEmpty() || movable2.isEmpty()) {
+ continue;
+ }
+
+ final List<TaskId> taskIdList =
Stream.concat(movable1.stream(),
+ movable2.stream())
+ .sorted()
+ .collect(Collectors.toList());
+
+ final List<UUID> clients = Stream.of(clientId1, clientId2)
+ .sorted().collect(
+ Collectors.toList());
+
+ final AssignmentGraph assignmentGraph = buildTaskGraph(
+ assignmentsByUuid,
+ clientRacks,
+ taskIdList,
+ clients,
+ topicPartitionsByTaskId,
+ crossRackTrafficCost,
+ nonOverlapCost,
+ false,
+ false,
+ graphConstructor
+ );
+ assignmentGraph.graph.solveMinCostFlow();
+
+ taskMoved |= graphConstructor.assignTaskFromMinCostFlow(
+ assignmentGraph.graph,
+ clientIds,
+ taskIds,
+ assignmentsByUuid,
+ assignmentGraph.taskCountByClient,
+ assignmentGraph.clientByTask,
+ (assignment, taskId) -> assignment.assignTask(new
AssignedTask(taskId, AssignedTask.Type.ACTIVE)),
+ (assignment, taskId) -> assignment.removeTask(new
AssignedTask(taskId, AssignedTask.Type.ACTIVE)),
Review Comment:
```suggestion
(assignment, taskId) -> assignment.assignTask(new
AssignedTask(taskId, AssignedTask.Type.STANDBY)),
(assignment, taskId) -> assignment.removeTask(new
AssignedTask(taskId, AssignedTask.Type.STANDBY)),
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -244,18 +274,117 @@ public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareStandbyTas
);
LOG.info("Assignment before standby task optimization has cost {}",
initialCost);
- throw new UnsupportedOperationException("Not yet Implemented.");
+ final MoveStandbyTaskPredicate moveablePredicate =
getStandbyTaskMovePredicate(applicationState);
+ final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment,
List<TaskId>> getMovableTasks = (source, destination) -> {
+ return source.tasks().values().stream().filter(task -> task.type()
== AssignedTask.Type.STANDBY)
+ .filter(task -> {
+ return !destination.tasks().containsKey(task.id());
+ })
+ .filter(task -> {
+ final KafkaStreamsState sourceState =
kafkaStreamsStates.get(source.processId());
+ final KafkaStreamsState destinationState =
kafkaStreamsStates.get(source.processId());
+ return moveablePredicate.canMoveStandbyTask(sourceState,
destinationState, task.id(), kafkaStreamsAssignments);
+ })
+ .map(AssignedTask::id)
+ .sorted()
+ .collect(Collectors.toList());
+ };
+
+ final long startTime = System.currentTimeMillis();
+ boolean taskMoved = true;
+ int round = 0;
+ final RackAwareGraphConstructor<KafkaStreamsAssignment>
graphConstructor = RackAwareGraphConstructorFactory.create(
+
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+ while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+ taskMoved = false;
+ round++;
+ for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+ final UUID clientId1 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState1 =
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+ for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+ final UUID clientId2 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState2 =
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+ final String rack1 =
clientRacks.get(clientState1.processId().id()).get();
+ final String rack2 =
clientRacks.get(clientState2.processId().id()).get();
Review Comment:
Future cleanup: we can actually get rid of this clientRacks data structure
altogether, and just use the rack ids we already processed and nicely organized
in the KafkaStreamsState which is addressable by UUID, ie
```suggestion
final String rack1 =
kafkaStreamsStates.get(clientState1.processId().id()).rackId().get();
final String rack2 =
kafkaStreamsStates.get(clientState2.processId().id()).rackId().get();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]