absurdfarce commented on code in PR #1743:
URL: 
https://github.com/apache/cassandra-java-driver/pull/1743#discussion_r1628509853


##########
core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DistanceEvent.java:
##########
@@ -29,11 +31,16 @@
 @Immutable
 public class DistanceEvent {
   public final NodeDistance distance;
-  public final DefaultNode node;
+  private final WeakReference<DefaultNode> node;

Review Comment:
   Same as discussed in NodeStateEvent; I don't think we need a WeakReference 
here.



##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -274,40 +303,23 @@ protected boolean isBusy(@NonNull Node node, @NonNull 
Session session) {
   }
 
   protected boolean isResponseRateInsufficient(@NonNull Node node, long now) {
-    // response rate is considered insufficient when less than 2 responses 
were obtained in
-    // the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
-    if (responseTimes.containsKey(node)) {
-      AtomicLongArray array = responseTimes.get(node);
-      if (array.length() == 2) {
-        long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
-        long leastRecent = array.get(0);
-        return leastRecent - threshold < 0;
-      }
+    NodeResponseRateSample sample = responseTimes.getIfPresent(node);
+    if (sample == null) {
+      return true;

Review Comment:
   I'm wondering if this is too aggressive now that we're not guaranteed that 
historical node response time data will always be present.  I'm leaning towards 
an argument which says that we should only say a node's response time is 
insufficient if we have clear, unambiguous data which indicates that.  That 
means that missing data (either because it hasn't been observed yet or GC 
pressure has removed it now that we're using weak refs) would _not_ be enough 
to mark a node as unhealthy.
   
   @adutra I'm very interested in your take on this; you have considerably more 
context with which to evaluate this question than I do. :)



##########
core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateEvent.java:
##########
@@ -53,14 +55,19 @@ public static NodeStateEvent removed(DefaultNode node) {
    */
   public final NodeState newState;
 
-  public final DefaultNode node;
+  private final WeakReference<DefaultNode> node;

Review Comment:
   I don't think we need this change.  Events of this type are very 
short-lived; they exist to communicate information between driver components 
that don't necessarily know about each other.  They aren't stockpiled or stored 
in any meaningful way.  You address the problem that was originally reported by 
changing the distances map in LoadBalancingPolicyWrapper to use weak refs... 
it's not at all clear to me that making the events use weak references buys you 
much on top of that.
   
   Perhaps more importantly this change has the potential to make events a lot 
less useful.  The driver uses events to notify components about changes in 
nodes, but if the _actual node affected_ might not be present what use is the 
notification?  Components that receive events without node information have no 
choice but to ignore them which means (a) every receiver has to check for null 
node information and (b) if you just ignore events without node data (which all 
these receivers appear to do) you'll get a lot more indeterminate behaviour 
since the presence or absence of node data in events is basically a random 
value (since it's determined by GC pressure which is essentially random from 
the perspective of the driver).
   
   I note that the null checks referenced in (a) above are a big chunk of 
what's actually in this PR.  If not for this constraint the change set would be 
a lot smaller.  That wouldn't be the worse thing as the presence of all those 
null checks obscures the more significant changes in at least a moderate way.



##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -96,14 +100,39 @@ public class DefaultLoadBalancingPolicy extends 
BasicLoadBalancingPolicy impleme
   private static final int MAX_IN_FLIGHT_THRESHOLD = 10;
   private static final long RESPONSE_COUNT_RESET_INTERVAL_NANOS = 
MILLISECONDS.toNanos(200);
 
-  protected final Map<Node, AtomicLongArray> responseTimes = new 
ConcurrentHashMap<>();
+  protected final LoadingCache<Node, NodeResponseRateSample> responseTimes;
   protected final Map<Node, Long> upTimes = new ConcurrentHashMap<>();
   private final boolean avoidSlowReplicas;
 
   public DefaultLoadBalancingPolicy(@NonNull DriverContext context, @NonNull 
String profileName) {
     super(context, profileName);
     this.avoidSlowReplicas =
         
profile.getBoolean(DefaultDriverOption.LOAD_BALANCING_POLICY_SLOW_AVOIDANCE, 
true);
+    CacheLoader<Node, NodeResponseRateSample> cacheLoader =
+        new CacheLoader<Node, NodeResponseRateSample>() {
+          @Override
+          public NodeResponseRateSample load(Node key) {
+            NodeResponseRateSample sample = responseTimes.getIfPresent(key);
+            if (sample == null) {
+              sample = new NodeResponseRateSample();
+            } else {
+              sample.update();
+            }
+            return sample;
+          }
+        };
+    this.responseTimes =
+        CacheBuilder.newBuilder()
+            .weakKeys()
+            .removalListener(
+                (RemovalListener<Node, NodeResponseRateSample>)
+                    notification ->
+                        LOG.trace(
+                            "[{}] Evicting response times for {}: {}",
+                            logPrefix,
+                            notification.getKey(),
+                            notification.getCause()))

Review Comment:
   @aratno I know you asked for this RemovalListener in a [previous 
comment](https://github.com/apache/cassandra-java-driver/pull/1743#discussion_r1479160472)
 under the assumption that it would help identify cases in which a node was 
(incorrectly) marked unhealthy because data had expired from the map.  I'm 
going to argue instead that we shouldn't mark a node as unhealthy unless we 
have clear data indicating that it is so... which means the absence of response 
time data isn't enough to make it unhealthy.  If we take that approach would 
you still argue for a removal listener here?  I'm kind of two minds about it 
myself; I can see the benefit but it's possibly less interesting if the 
potentially damaging effects are mitigated.
   
   Thoughts?



##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -96,14 +100,39 @@ public class DefaultLoadBalancingPolicy extends 
BasicLoadBalancingPolicy impleme
   private static final int MAX_IN_FLIGHT_THRESHOLD = 10;
   private static final long RESPONSE_COUNT_RESET_INTERVAL_NANOS = 
MILLISECONDS.toNanos(200);
 
-  protected final Map<Node, AtomicLongArray> responseTimes = new 
ConcurrentHashMap<>();
+  protected final LoadingCache<Node, NodeResponseRateSample> responseTimes;
   protected final Map<Node, Long> upTimes = new ConcurrentHashMap<>();
   private final boolean avoidSlowReplicas;
 
   public DefaultLoadBalancingPolicy(@NonNull DriverContext context, @NonNull 
String profileName) {
     super(context, profileName);
     this.avoidSlowReplicas =
         
profile.getBoolean(DefaultDriverOption.LOAD_BALANCING_POLICY_SLOW_AVOIDANCE, 
true);
+    CacheLoader<Node, NodeResponseRateSample> cacheLoader =
+        new CacheLoader<Node, NodeResponseRateSample>() {
+          @Override
+          public NodeResponseRateSample load(Node key) {
+            NodeResponseRateSample sample = responseTimes.getIfPresent(key);
+            if (sample == null) {
+              sample = new NodeResponseRateSample();
+            } else {
+              sample.update();
+            }
+            return sample;
+          }
+        };

Review Comment:
   You actually don't need a LoadingCache here; a simple Cache will do.  A 
LoadingCache is useful when you need to create an entry for a key that's been 
requested but isn't in the cache yet.  That's not what's going on here; your 
CacheLoader is loading keys from the map, creating a new version if it isn't 
present and then updating it.  You're not doing something to create the 
NodeResponseRateSample instance beyond calling the constructor if necessary.
   
   You can accomplish the exact same thing using just a regular Cache by 
changing your update logic just a bit:
   
   ```java
     protected void updateResponseTimes(@NonNull Node node) {
       try {
   
         responseTimes.get(node, NodeResponseRateSample::new).update();
       }
       catch (ExecutionException ee) {
         LOG.info("[{}] Exception updating node response times: {}", logPrefix, 
ee);
       }
     }
   ```
   
   I note that this approach is even 
[recommended](https://javadoc.io/static/com.google.guava/guava/33.2.1-jre/com/google/common/cache/Cache.html#put(K,V))
 in the Guava Javadoc.



##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -274,40 +303,23 @@ protected boolean isBusy(@NonNull Node node, @NonNull 
Session session) {
   }
 
   protected boolean isResponseRateInsufficient(@NonNull Node node, long now) {
-    // response rate is considered insufficient when less than 2 responses 
were obtained in
-    // the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
-    if (responseTimes.containsKey(node)) {
-      AtomicLongArray array = responseTimes.get(node);
-      if (array.length() == 2) {
-        long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
-        long leastRecent = array.get(0);
-        return leastRecent - threshold < 0;
-      }
+    NodeResponseRateSample sample = responseTimes.getIfPresent(node);
+    if (sample == null) {
+      return true;
+    } else {
+      return !sample.hasSufficientResponses(now);

Review Comment:
   Style nit: the ternary operator does this a bit more cleanly:
   
   ```java
   return (sample == null) ? true : !sample.hasSufficientResponses(now)
   ```



##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -318,4 +330,42 @@ protected int getInFlight(@NonNull Node node, @NonNull 
Session session) {
     // processing them).
     return (pool == null) ? 0 : pool.getInFlight();
   }
+
+  // Exposed as protected for unit tests
+  protected class NodeResponseRateSample {
+    // The array stores at most two timestamps, since we don't need more;
+    // the first one is always the least recent one, and hence the one to 
inspect.
+    protected AtomicLongArray times;
+
+    private NodeResponseRateSample() {
+      times = new AtomicLongArray(1);
+      times.set(0, nanoTime());
+    }
+
+    // Only for unit tests
+    protected NodeResponseRateSample(AtomicLongArray times) {
+      this.times = times;
+    }
+
+    private void update() {
+      long now = nanoTime();
+      if (times.length() == 1) {
+        long previous = times.get(0);
+        times = new AtomicLongArray(2);
+        times.set(0, previous);
+        times.set(1, now);
+      } else {
+        times.set(0, times.get(1));
+        times.set(1, now);
+      }
+    }

Review Comment:
   This seems to be subject to race conditions to me, although I haven't been 
able to construct a specific scenario that triggers it.  The old implementation 
computed a result array locally and then returned that; there was no changing 
of state at the ultimate destination (the old responseTimes map) beyond 
updating an entry or not doing so.  This implementation is doing more than 
that; we're actually changing the types of elements (this.times changes in 
size) based on data we receive.
   
   It might be okay... but then again I'm also not sure I see a harm in just 
making this.times be an array of size two from the beginning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to