apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1624625757


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -29,20 +32,47 @@ public class AssignmentConfigs {
     private final int numStandbyReplicas;
     private final long probingRebalanceIntervalMs;
     private final List<String> rackAwareAssignmentTags;
-    private final int rackAwareTrafficCost;
-    private final int rackAwareNonOverlapCost;
+    private final OptionalInt rackAwareTrafficCost;
+    private final OptionalInt rackAwareNonOverlapCost;
     private final String rackAwareAssignmentStrategy;
 
-    public AssignmentConfigs(final StreamsConfig configs) {
-        this(
-            configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-            configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-            configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-            
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-            configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-            
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+    public static AssignmentConfigs of(final StreamsConfig configs) {
+        final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+        final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+        final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+        final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+        final List<String> rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+        final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+        Integer rackAwareTrafficCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
+        Integer rackAwareNonOverlapCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
+
+        final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+        if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+            if (rackAwareTrafficCost == null) {
+                rackAwareTrafficCost = 
StickyTaskAssignor.DEFAULT_STICKY_TRAFFIC_COST;
+            }
+            if (rackAwareNonOverlapCost == null) {
+                rackAwareNonOverlapCost = 
StickyTaskAssignor.DEFAULT_STICKY_NON_OVERLAP_COST;
+            }
+        } else if 
(HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {
+            // TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor 
class once it implements the new TaskAssignor interface
+            if (rackAwareTrafficCost == null) {
+                rackAwareTrafficCost = 
HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST;
+            }
+            if (rackAwareNonOverlapCost == null) {
+                rackAwareNonOverlapCost = 
HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST;
+            }
+        }
+
+        return new AssignmentConfigs(
+            acceptableRecoveryLag,
+            maxWarmupReplicas,
+            numStandbyReplicas,
+            probingRebalanceIntervalMs,
+            rackAwareAssignmentTags,
+            OptionalInt.of(rackAwareTrafficCost),
+            OptionalInt.of(rackAwareNonOverlapCost),

Review Comment:
   The only way to do this is with if/else, OptionalInt's API surface is pretty 
tragically lackluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to