adutra commented on code in PR #1743:
URL:
https://github.com/apache/cassandra-java-driver/pull/1743#discussion_r1631420991
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -318,4 +330,42 @@ protected int getInFlight(@NonNull Node node, @NonNull
Session session) {
// processing them).
return (pool == null) ? 0 : pool.getInFlight();
}
+
+ // Exposed as protected for unit tests
+ protected class NodeResponseRateSample {
+ // The array stores at most two timestamps, since we don't need more;
+ // the first one is always the least recent one, and hence the one to
inspect.
+ protected AtomicLongArray times;
+
+ private NodeResponseRateSample() {
+ times = new AtomicLongArray(1);
+ times.set(0, nanoTime());
+ }
+
+ // Only for unit tests
+ protected NodeResponseRateSample(AtomicLongArray times) {
+ this.times = times;
+ }
+
+ private void update() {
+ long now = nanoTime();
+ if (times.length() == 1) {
+ long previous = times.get(0);
+ times = new AtomicLongArray(2);
+ times.set(0, previous);
+ times.set(1, now);
+ } else {
+ times.set(0, times.get(1));
+ times.set(1, now);
+ }
+ }
Review Comment:
As I stated previously, this will race indeed, _unless_ calls to `update()`
are externally synchronized. I think the loading cache approach does guarantee
that.
The idea of using an array of size 2 from the beginning could save us one
array allocation – why not.
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -274,40 +303,23 @@ protected boolean isBusy(@NonNull Node node, @NonNull
Session session) {
}
protected boolean isResponseRateInsufficient(@NonNull Node node, long now) {
- // response rate is considered insufficient when less than 2 responses
were obtained in
- // the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
- if (responseTimes.containsKey(node)) {
- AtomicLongArray array = responseTimes.get(node);
- if (array.length() == 2) {
- long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
- long leastRecent = array.get(0);
- return leastRecent - threshold < 0;
- }
+ NodeResponseRateSample sample = responseTimes.getIfPresent(node);
+ if (sample == null) {
+ return true;
+ } else {
+ return !sample.hasSufficientResponses(now);
Review Comment:
Or even:
```java
return sample == null || !sample.hasSufficientResponses(now)
```
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -318,4 +330,42 @@ protected int getInFlight(@NonNull Node node, @NonNull
Session session) {
// processing them).
return (pool == null) ? 0 : pool.getInFlight();
}
+
+ // Exposed as protected for unit tests
+ protected class NodeResponseRateSample {
+ // The array stores at most two timestamps, since we don't need more;
+ // the first one is always the least recent one, and hence the one to
inspect.
+ protected AtomicLongArray times;
+
+ private NodeResponseRateSample() {
+ times = new AtomicLongArray(1);
+ times.set(0, nanoTime());
+ }
+
+ // Only for unit tests
Review Comment:
Maybe annotate with `@VisibleForTesting`?
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -96,14 +100,39 @@ public class DefaultLoadBalancingPolicy extends
BasicLoadBalancingPolicy impleme
private static final int MAX_IN_FLIGHT_THRESHOLD = 10;
private static final long RESPONSE_COUNT_RESET_INTERVAL_NANOS =
MILLISECONDS.toNanos(200);
- protected final Map<Node, AtomicLongArray> responseTimes = new
ConcurrentHashMap<>();
+ protected final LoadingCache<Node, NodeResponseRateSample> responseTimes;
protected final Map<Node, Long> upTimes = new ConcurrentHashMap<>();
private final boolean avoidSlowReplicas;
public DefaultLoadBalancingPolicy(@NonNull DriverContext context, @NonNull
String profileName) {
super(context, profileName);
this.avoidSlowReplicas =
profile.getBoolean(DefaultDriverOption.LOAD_BALANCING_POLICY_SLOW_AVOIDANCE,
true);
+ CacheLoader<Node, NodeResponseRateSample> cacheLoader =
+ new CacheLoader<Node, NodeResponseRateSample>() {
+ @Override
+ public NodeResponseRateSample load(Node key) {
+ NodeResponseRateSample sample = responseTimes.getIfPresent(key);
+ if (sample == null) {
+ sample = new NodeResponseRateSample();
+ } else {
+ sample.update();
+ }
+ return sample;
+ }
+ };
Review Comment:
Isn't your version subject to race conditions though. In the loading cache
version, `NodeResponseRateSample.update()` is called inside
`CacheLoader.load()` and thus cannot be called concurrently. In your
suggestion, `update()` is called _after_ the value is retrieved from the cache,
and thus could be executed concurrently.
##########
core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java:
##########
@@ -274,40 +303,23 @@ protected boolean isBusy(@NonNull Node node, @NonNull
Session session) {
}
protected boolean isResponseRateInsufficient(@NonNull Node node, long now) {
- // response rate is considered insufficient when less than 2 responses
were obtained in
- // the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
- if (responseTimes.containsKey(node)) {
- AtomicLongArray array = responseTimes.get(node);
- if (array.length() == 2) {
- long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
- long leastRecent = array.get(0);
- return leastRecent - threshold < 0;
- }
+ NodeResponseRateSample sample = responseTimes.getIfPresent(node);
+ if (sample == null) {
+ return true;
Review Comment:
Yes, I tend to agree that the absence of any samples should be interpreted
as "we don't know".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]