Repository: camel Updated Branches: refs/heads/master 872082312 -> c3b6595f9
camel-reactive-streams: Fix latest strategy which was mistakenly acting as oldest. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c3b6595f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c3b6595f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c3b6595f Branch: refs/heads/master Commit: c3b6595f94d82cb414e50752b5555b5ab2b7970c Parents: 8720823 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Apr 9 14:32:12 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Apr 9 14:32:12 2017 +0200 ---------------------------------------------------------------------- .../ReactiveStreamsBackpressureStrategy.java | 23 ++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c3b6595f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java index db46915..fe23866 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.reactive.streams; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Deque; @@ -44,7 +43,7 @@ public enum ReactiveStreamsBackpressureStrategy { @Override public <T> Collection<T> update(Deque<T> buffer, T element) { if (buffer.size() > 0) { - return Arrays.asList(element); + return Collections.singletonList(element); } else { buffer.addLast(element); return Collections.emptySet(); @@ -61,14 +60,30 @@ public enum ReactiveStreamsBackpressureStrategy { public <T> Collection<T> update(Deque<T> buffer, T element) { Collection<T> discarded = Collections.emptySet(); if (buffer.size() > 0) { - discarded = Arrays.asList(buffer.removeFirst()); + discarded = Collections.singletonList(buffer.removeLast()); } buffer.addLast(element); return discarded; } - }; + }, + /** + * Keeps only the oldest onNext value, overwriting any previous value if the + * downstream can't keep up. + */ + OLDEST { + @Override + public <T> Collection<T> update(Deque<T> buffer, T element) { + Collection<T> discarded = Collections.emptySet(); + if (buffer.size() > 0) { + discarded = Collections.singletonList(buffer.removeFirst()); + } + + buffer.addLast(element); + return discarded; + } + }; /** * Updates the buffer and returns a list of discarded elements (if any).