This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f83fbc67234 CAMEL-20705: fix non-atomic operations on volatile fields
on the ScheduledPollConsumer
f83fbc67234 is described below
commit f83fbc67234976285b40e4fe01eda35c96113421
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Apr 23 13:27:10 2024 +0200
CAMEL-20705: fix non-atomic operations on volatile fields on the
ScheduledPollConsumer
---
.../impl/ScheduledPollConsumerBackoffTest.java | 10 ++--
.../camel/support/ScheduledPollConsumer.java | 70 ++++++++++++----------
2 files changed, 43 insertions(+), 37 deletions(-)
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java
index 90c455d655d..6e4eb2d4b49 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java
@@ -45,7 +45,7 @@ public class ScheduledPollConsumerBackoffTest extends
ContextTestSupport {
consumer.run();
consumer.run();
consumer.run();
- assertEquals(2, commits);
+ assertEquals(3, commits);
// and now we poll again
consumer.run();
consumer.run();
@@ -55,9 +55,9 @@ public class ScheduledPollConsumerBackoffTest extends
ContextTestSupport {
consumer.run();
consumer.run();
consumer.run();
- assertEquals(4, commits);
+ assertEquals(6, commits);
consumer.run();
- assertEquals(5, commits);
+ assertEquals(6, commits);
consumer.stop();
}
@@ -78,7 +78,7 @@ public class ScheduledPollConsumerBackoffTest extends
ContextTestSupport {
consumer.run();
consumer.run();
consumer.run();
- assertEquals(3, errors);
+ assertEquals(4, errors);
// and now we poll again
consumer.run();
consumer.run();
@@ -89,7 +89,7 @@ public class ScheduledPollConsumerBackoffTest extends
ContextTestSupport {
consumer.run();
consumer.run();
consumer.run();
- assertEquals(6, errors);
+ assertEquals(8, errors);
consumer.stop();
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
index 7081fa6c7f5..98e26fb6b26 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
@@ -21,6 +21,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.Component;
@@ -72,10 +73,10 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
// state during running
private volatile boolean polling;
- private volatile int backoffCounter;
- private volatile long idleCounter;
- private volatile long errorCounter;
- private volatile long successCounter;
+ private final AtomicInteger backoffCounter = new AtomicInteger();
+ private final AtomicLong idleCounter = new AtomicLong();
+ private final AtomicLong errorCounter = new AtomicLong();
+ private final AtomicLong successCounter = new AtomicLong();
private volatile Throwable lastError;
private volatile Map<String, Object> lastErrorDetails;
private final AtomicLong counter = new AtomicLong();
@@ -147,24 +148,25 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
// should we backoff if its enabled, and either the idle or error
counter is > the threshold
if (backoffMultiplier > 0
// either idle or error threshold could be not in use, so
check for that and use MAX_VALUE if not in use
- && idleCounter >= (backoffIdleThreshold > 0 ?
backoffIdleThreshold : Integer.MAX_VALUE)
- || errorCounter >= (backoffErrorThreshold > 0 ?
backoffErrorThreshold : Integer.MAX_VALUE)) {
- if (backoffCounter++ < backoffMultiplier) {
+ && idleCounter.longValue() >= (backoffIdleThreshold > 0 ?
backoffIdleThreshold : Integer.MAX_VALUE)
+ || errorCounter.longValue() >= (backoffErrorThreshold > 0 ?
backoffErrorThreshold : Integer.MAX_VALUE)) {
+ final int currentBackoffCounter = backoffCounter.incrementAndGet();
+ if (currentBackoffCounter < backoffMultiplier) {
// yes we should backoff
- if (idleCounter > 0) {
- LOG.debug("doRun() backoff due subsequent {} idles
(backoff at {}/{})", idleCounter, backoffCounter,
- backoffMultiplier);
+ if (idleCounter.intValue() > 0) {
+ LOG.debug("doRun() backoff due subsequent {} idles
(backoff at {}/{})", idleCounter.longValue(),
+ backoffCounter.intValue(), backoffMultiplier);
} else {
- LOG.debug("doRun() backoff due subsequent {} errors
(backoff at {}/{})", errorCounter, backoffCounter,
- backoffMultiplier);
+ LOG.debug("doRun() backoff due subsequent {} errors
(backoff at {}/{})", errorCounter.intValue(),
+ backoffCounter.intValue(), backoffMultiplier);
}
return;
} else {
// we are finished with backoff so reset counters
- idleCounter = 0;
- errorCounter = 0;
- backoffCounter = 0;
- successCounter = 0;
+ idleCounter.set(0);
+ errorCounter.set(0);
+ backoffCounter.set(0);
+ successCounter.set(0);
LOG.trace("doRun() backoff finished, resetting counters.");
}
}
@@ -218,7 +220,7 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
// clear any error that might be since we have
successfully polled, otherwise readiness checks might believe the
// consumer to be unhealthy
- errorCounter = 0;
+ errorCounter.set(0);
lastError = null;
lastErrorDetails = null;
@@ -264,9 +266,9 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
}
if (cause != null) {
- idleCounter = 0;
- successCounter = 0;
- errorCounter++;
+ idleCounter.set(0);
+ successCounter.set(0);
+ errorCounter.incrementAndGet();
lastError = cause;
// enrich last error with http response code if possible
if (cause instanceof HttpResponseAware) {
@@ -276,9 +278,13 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
}
}
} else {
- idleCounter = polledMessages == 0 ? ++idleCounter : 0;
- successCounter++;
- errorCounter = 0;
+ if (polledMessages == 0) {
+ idleCounter.incrementAndGet();
+ } else {
+ idleCounter.set(0);
+ }
+ successCounter.incrementAndGet();
+ errorCounter.set(0);
lastError = null;
lastErrorDetails = null;
}
@@ -286,8 +292,8 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
// now first pool is done after the poll is complete
firstPollDone = true;
- LOG.trace("doRun() done with idleCounter={}, successCounter={},
errorCounter={}", idleCounter, successCounter,
- errorCounter);
+ LOG.trace("doRun() done with idleCounter={}, successCounter={},
errorCounter={}", idleCounter.longValue(),
+ successCounter.longValue(), errorCounter.longValue());
// avoid this thread to throw exceptions because the thread pool wont
re-schedule a new thread
}
@@ -406,7 +412,7 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
}
public int getBackoffCounter() {
- return backoffCounter;
+ return backoffCounter.intValue();
}
public int getBackoffMultiplier() {
@@ -460,7 +466,7 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
* @see #getSuccessCounter()
*/
public long getErrorCounter() {
- return errorCounter;
+ return errorCounter.longValue();
}
/**
@@ -470,7 +476,7 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
* @see #getErrorCounter()
*/
public long getSuccessCounter() {
- return successCounter;
+ return successCounter.longValue();
}
/**
@@ -693,10 +699,10 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
}
// clear counters
- backoffCounter = 0;
- idleCounter = 0;
- errorCounter = 0;
- successCounter = 0;
+ backoffCounter.set(0);
+ idleCounter.set(0);
+ errorCounter.set(0);
+ successCounter.set(0);
counter.set(0);
// clear ready state
firstPollDone = false;