dimitarndimitrov commented on code in PR #17221:
URL: https://github.com/apache/kafka/pull/17221#discussion_r1771776869
##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java:
##########
@@ -172,4 +177,39 @@ public void testHistogramDataReset() {
assertEquals(numEventsInFirstCycle, hdrHistogram.count(now +
maxSnapshotAgeMs));
assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 +
maxSnapshotAgeMs));
}
+
+ @Test
+ public void testLatestHistogramRace() throws InterruptedException,
ExecutionException {
+ long maxSnapshotAgeMs = 10L;
+ long now = System.currentTimeMillis();
+ HdrHistogram hdrHistogram = new HdrHistogram(maxSnapshotAgeMs,
MAX_VALUE, 1);
+ ExecutorService countExecutor = Executors.newFixedThreadPool(2);
+ for (int i = 1; i < 10000; i++) {
+ int numEvents = 2;
+ for (int j = 0; j < numEvents; j++) {
+ hdrHistogram.record(i);
+ }
+ final long moreThanMaxAge = now + maxSnapshotAgeMs + 1;
+ now = moreThanMaxAge;
+ CountDownLatch latch = new CountDownLatch(1);
+ Callable<Long> countTask = () -> {
+ try {
+ assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
Review Comment:
Ah, thanks for raising that, I actually considered this and there are two
reasons why I didn't go with a barrier:
1. The latch is more familiar to people - e.g. in the AK codebase I can see
>100 matches in >30 files for `CountDownLatch` and <10 matches in only 2 files
for `CyclicBarrier`.
2. On my local machine the latch is faster, resulting in a local run of the
test between 400 and 500 ms, while the barrier results in a local run of the
test between 500 and 700 ms, regardless if the barrier is reused and reset or
not. This matches my previous experience and can be explained by the fact that
the barrier locks manually while the latch uses AQS (the main building block of
the various locking and synchronization utilities the JDK provides) directly.
- Note that we can also use a `Phaser` here, and while it's faster than
the barrier, it's even less well-known than it, it requires 2 API calls if we
want to have a timed wait, and is still not really as fast as the latch.
Just to clarify, what we are doing here is a canonical usage of a
`CountDownLatch` - signalling the worker threads from the external test thread.
##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java:
##########
@@ -172,4 +177,39 @@ public void testHistogramDataReset() {
assertEquals(numEventsInFirstCycle, hdrHistogram.count(now +
maxSnapshotAgeMs));
assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 +
maxSnapshotAgeMs));
}
+
+ @Test
+ public void testLatestHistogramRace() throws InterruptedException,
ExecutionException {
+ long maxSnapshotAgeMs = 10L;
+ long now = System.currentTimeMillis();
+ HdrHistogram hdrHistogram = new HdrHistogram(maxSnapshotAgeMs,
MAX_VALUE, 1);
+ ExecutorService countExecutor = Executors.newFixedThreadPool(2);
+ for (int i = 1; i < 10000; i++) {
+ int numEvents = 2;
+ for (int j = 0; j < numEvents; j++) {
+ hdrHistogram.record(i);
+ }
+ final long moreThanMaxAge = now + maxSnapshotAgeMs + 1;
+ now = moreThanMaxAge;
+ CountDownLatch latch = new CountDownLatch(1);
+ Callable<Long> countTask = () -> {
+ try {
+ assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
+ return hdrHistogram.count(moreThanMaxAge);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ Future<Long> t1Future = countExecutor.submit(countTask);
+ Future<Long> t2Future = countExecutor.submit(countTask);
+ latch.countDown();
+ long t1Count = t1Future.get();
+ long t2Count = t2Future.get();
+ assertTrue(
+ numEvents == t1Count && numEvents == t2Count,
Review Comment:
> actually, is this what we want? my understanding is that if we have two
threads concurrently calling latestHistogram, we want the first thread to reset
the recorder and return the previous histogram where the other thread should
return the reset-ed histogram.
Yes, this is what we want for this particular test. We want the threads to
race for the snapshot reset, we want the thread that wins the race to be
guaranteed to reset the snapshot, and we want the thread that loses the race to
be guaranteed to accept the reset snapshot and not to reset the snapshot itself.
> i may be wrong but both thread counts returning 2 doesn't seem right
It is actually the correct behavior given the current API. We read from both
threads using the same timestamp, which is why the latest condition I mentioned
above is guaranteed - that the thread that loses the race will not reset the
snapshot itself.
I think some confusion might be arising from the fact that the histogram
relies user thread collaboration for the snapshot freshness (e.g. like Guava
Cache implementations rely on user thread collaboration for entry expiration).
So the actual max age of the histogram snapshot in case no reads are performed
can be infinity.
- We don't really want to do that maintenance on the write path, and while
it's possible to do it separately and not rely on user threads (e.g. the same
way later versions of Caffeine, a Guava Cache successor, allow specifying an
executor/scheduler for entry expiration) I don't think this is a necessary
feature for the wrapper.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]