Repository: camel
Updated Branches:
  refs/heads/master b70a69fd1 -> 79e77d9c1


CAMEL-11148: fixing backpressure strategies


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/79e77d9c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/79e77d9c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/79e77d9c

Branch: refs/heads/master
Commit: 79e77d9c1965a20772437afdf4b201718b9f9bbf
Parents: b70a69f
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Authored: Thu Apr 13 17:02:55 2017 +0200
Committer: Nicola Ferraro <ni.ferr...@gmail.com>
Committed: Thu Apr 13 17:03:04 2017 +0200

----------------------------------------------------------------------
 .../ReactiveStreamsBackpressureStrategy.java    | 50 ++++++++------------
 .../streams/BackpressureStrategyTest.java       |  4 +-
 2 files changed, 23 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/79e77d9c/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
index fe23866..af3118a 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
@@ -30,22 +30,28 @@ public enum ReactiveStreamsBackpressureStrategy {
      */
     BUFFER {
         @Override
-        public <T> Collection<T> update(Deque<T> buffer, T element) {
-            buffer.addLast(element);
+        public <T> Collection<T> update(Deque<T> buffer, T newItem) {
+            // always buffer
+            buffer.addLast(newItem);
+            // never discard
             return Collections.emptySet();
         }
     },
 
     /**
-     * Drops the most recent onNext value if the downstream can't keep up.
+     * Keeps only the oldest onNext value, discarding any future value
+     * until it's consumed by the downstream subscriber.
      */
-    DROP {
+    OLDEST {
         @Override
-        public <T> Collection<T> update(Deque<T> buffer, T element) {
+        public <T> Collection<T> update(Deque<T> buffer, T newItem) {
             if (buffer.size() > 0) {
-                return Collections.singletonList(element);
+                // the buffer has another item, so discarding the incoming one
+                return Collections.singletonList(newItem);
             } else {
-                buffer.addLast(element);
+                // add the new item to the buffer, since it was empty
+                buffer.addLast(newItem);
+                // nothing is discarded
                 return Collections.emptySet();
             }
         }
@@ -57,30 +63,16 @@ public enum ReactiveStreamsBackpressureStrategy {
      */
     LATEST {
         @Override
-        public <T> Collection<T> update(Deque<T> buffer, T element) {
-            Collection<T> discarded = Collections.emptySet();
-            if (buffer.size() > 0) {
-                discarded = Collections.singletonList(buffer.removeLast());
-            }
-
-            buffer.addLast(element);
-            return discarded;
-        }
-    },
-
-    /**
-     * Keeps only the oldest onNext value, overwriting any previous value if 
the
-     * downstream can't keep up.
-     */
-    OLDEST {
-        @Override
-        public <T> Collection<T> update(Deque<T> buffer, T element) {
+        public <T> Collection<T> update(Deque<T> buffer, T newItem) {
             Collection<T> discarded = Collections.emptySet();
             if (buffer.size() > 0) {
+                // there should be an item in the buffer,
+                // so removing it to overwrite
                 discarded = Collections.singletonList(buffer.removeFirst());
             }
-
-            buffer.addLast(element);
+            // add the new item to the buffer
+            // (it should be the only item in the buffer now)
+            buffer.addLast(newItem);
             return discarded;
         }
     };
@@ -89,10 +81,10 @@ public enum ReactiveStreamsBackpressureStrategy {
      * Updates the buffer and returns a list of discarded elements (if any).
      *
      * @param buffer the buffer to update
-     * @param element the elment that should possibly be inserted
+     * @param newItem the elment that should possibly be inserted
      * @param <T> the generic type of the element
      * @return the list of discarded elements
      */
-    public abstract <T> Collection<T> update(Deque<T> buffer, T element);
+    public abstract <T> Collection<T> update(Deque<T> buffer, T newItem);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/79e77d9c/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
index 20f4119..029efb7 100644
--- 
a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
+++ 
b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
@@ -69,7 +69,7 @@ public class BackpressureStrategyTest extends 
CamelTestSupport {
     public void testBackpressureDropStrategy() throws Exception {
 
         ReactiveStreamsComponent comp = (ReactiveStreamsComponent) 
context().getComponent("reactive-streams");
-        comp.setBackpressureStrategy(ReactiveStreamsBackpressureStrategy.DROP);
+        
comp.setBackpressureStrategy(ReactiveStreamsBackpressureStrategy.OLDEST);
 
         new RouteBuilder() {
             @Override
@@ -164,7 +164,7 @@ public class BackpressureStrategyTest extends 
CamelTestSupport {
             public void configure() throws Exception {
                 from("timer:gen?period=20&repeatCount=20")
                         .setBody().header(Exchange.TIMER_COUNTER)
-                        
.to("reactive-streams:integers?backpressureStrategy=DROP");
+                        
.to("reactive-streams:integers?backpressureStrategy=OLDEST");
             }
         }.addRoutesToCamelContext(context);
 

Reply via email to