Repository: camel Updated Branches: refs/heads/master b70a69fd1 -> 79e77d9c1
CAMEL-11148: fixing backpressure strategies Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/79e77d9c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/79e77d9c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/79e77d9c Branch: refs/heads/master Commit: 79e77d9c1965a20772437afdf4b201718b9f9bbf Parents: b70a69f Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Thu Apr 13 17:02:55 2017 +0200 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Thu Apr 13 17:03:04 2017 +0200 ---------------------------------------------------------------------- .../ReactiveStreamsBackpressureStrategy.java | 50 ++++++++------------ .../streams/BackpressureStrategyTest.java | 4 +- 2 files changed, 23 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/79e77d9c/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java index fe23866..af3118a 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java @@ -30,22 +30,28 @@ public enum ReactiveStreamsBackpressureStrategy { */ BUFFER { @Override - public <T> Collection<T> update(Deque<T> buffer, T element) { - buffer.addLast(element); + public <T> Collection<T> update(Deque<T> buffer, T newItem) { + // always buffer + buffer.addLast(newItem); + // never discard return Collections.emptySet(); } }, /** - * Drops the most recent onNext value if the downstream can't keep up. + * Keeps only the oldest onNext value, discarding any future value + * until it's consumed by the downstream subscriber. */ - DROP { + OLDEST { @Override - public <T> Collection<T> update(Deque<T> buffer, T element) { + public <T> Collection<T> update(Deque<T> buffer, T newItem) { if (buffer.size() > 0) { - return Collections.singletonList(element); + // the buffer has another item, so discarding the incoming one + return Collections.singletonList(newItem); } else { - buffer.addLast(element); + // add the new item to the buffer, since it was empty + buffer.addLast(newItem); + // nothing is discarded return Collections.emptySet(); } } @@ -57,30 +63,16 @@ public enum ReactiveStreamsBackpressureStrategy { */ LATEST { @Override - public <T> Collection<T> update(Deque<T> buffer, T element) { - Collection<T> discarded = Collections.emptySet(); - if (buffer.size() > 0) { - discarded = Collections.singletonList(buffer.removeLast()); - } - - buffer.addLast(element); - return discarded; - } - }, - - /** - * Keeps only the oldest onNext value, overwriting any previous value if the - * downstream can't keep up. - */ - OLDEST { - @Override - public <T> Collection<T> update(Deque<T> buffer, T element) { + public <T> Collection<T> update(Deque<T> buffer, T newItem) { Collection<T> discarded = Collections.emptySet(); if (buffer.size() > 0) { + // there should be an item in the buffer, + // so removing it to overwrite discarded = Collections.singletonList(buffer.removeFirst()); } - - buffer.addLast(element); + // add the new item to the buffer + // (it should be the only item in the buffer now) + buffer.addLast(newItem); return discarded; } }; @@ -89,10 +81,10 @@ public enum ReactiveStreamsBackpressureStrategy { * Updates the buffer and returns a list of discarded elements (if any). * * @param buffer the buffer to update - * @param element the elment that should possibly be inserted + * @param newItem the elment that should possibly be inserted * @param <T> the generic type of the element * @return the list of discarded elements */ - public abstract <T> Collection<T> update(Deque<T> buffer, T element); + public abstract <T> Collection<T> update(Deque<T> buffer, T newItem); } http://git-wip-us.apache.org/repos/asf/camel/blob/79e77d9c/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java index 20f4119..029efb7 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java @@ -69,7 +69,7 @@ public class BackpressureStrategyTest extends CamelTestSupport { public void testBackpressureDropStrategy() throws Exception { ReactiveStreamsComponent comp = (ReactiveStreamsComponent) context().getComponent("reactive-streams"); - comp.setBackpressureStrategy(ReactiveStreamsBackpressureStrategy.DROP); + comp.setBackpressureStrategy(ReactiveStreamsBackpressureStrategy.OLDEST); new RouteBuilder() { @Override @@ -164,7 +164,7 @@ public class BackpressureStrategyTest extends CamelTestSupport { public void configure() throws Exception { from("timer:gen?period=20&repeatCount=20") .setBody().header(Exchange.TIMER_COUNTER) - .to("reactive-streams:integers?backpressureStrategy=DROP"); + .to("reactive-streams:integers?backpressureStrategy=OLDEST"); } }.addRoutesToCamelContext(context);