Repository: camel Updated Branches: refs/heads/master 96bbc91cf -> 872082312
CAMEL-11124: camel-reactive-streams - Add exception for discarded streams. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/87208231 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/87208231 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/87208231 Branch: refs/heads/master Commit: 87208231252a330fc0f3c728a6f3358702c326db Parents: 96bbc91 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Apr 9 13:55:26 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Apr 9 13:55:26 2017 +0200 ---------------------------------------------------------------------- .../main/docs/reactive-streams-component.adoc | 4 +-- .../streams/ReactiveStreamsComponent.java | 3 ++ .../ReactiveStreamsDiscardedException.java | 35 ++++++++++++++++++++ .../reactive/streams/engine/CamelPublisher.java | 2 +- .../streams/engine/CamelSubscription.java | 12 +++++-- .../ReactiveStreamsComponentConfiguration.java | 3 +- 6 files changed, 53 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc index 0c6fc8a..07535d6 100644 --- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc +++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc @@ -51,8 +51,8 @@ The Reactive Streams component supports 3 options which are listed below. [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= | Name | Description | Default | Type -| **internalEngine Configuration** (common) | Configures the internal engine for Reactive Streams. | | ReactiveStreamsEngine Configuration -| **backpressureStrategy** (common) | The backpressure strategy to use when pushing events to a slow subscriber. | | ReactiveStreams BackpressureStrategy +| **internalEngine Configuration** (advanced) | Configures the internal engine for Reactive Streams. | | ReactiveStreamsEngine Configuration +| **backpressureStrategy** (producer) | The backpressure strategy to use when pushing events to a slow subscriber. | BUFFER | ReactiveStreams BackpressureStrategy | **resolveProperty Placeholders** (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean |======================================================================= // component options: END http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java index 5801877..a80f648 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java @@ -21,14 +21,17 @@ import java.util.Map; import org.apache.camel.Endpoint; import org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration; import org.apache.camel.impl.UriEndpointComponent; +import org.apache.camel.spi.Metadata; /** * The Camel reactive-streams component. */ public class ReactiveStreamsComponent extends UriEndpointComponent { + @Metadata(label = "advanced") private ReactiveStreamsEngineConfiguration internalEngineConfiguration = new ReactiveStreamsEngineConfiguration(); + @Metadata(label = "producer", defaultValue = "BUFFER") private ReactiveStreamsBackpressureStrategy backpressureStrategy = ReactiveStreamsBackpressureStrategy.BUFFER; public ReactiveStreamsComponent() { http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java new file mode 100644 index 0000000..0ad811a --- /dev/null +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; + +public class ReactiveStreamsDiscardedException extends CamelExchangeException { + + private final String name; + + public ReactiveStreamsDiscardedException(String message, Exchange exchange, String name) { + super(message, exchange); + this.name = name; + } + + public String getName() { + return name; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java index f90f19c..4544b7d 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java @@ -62,7 +62,7 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC @Override public void subscribe(Subscriber<? super StreamPayload<Exchange>> subscriber) { Objects.requireNonNull(subscriber, "subscriber must not be null"); - CamelSubscription sub = new CamelSubscription(workerPool, this, this.backpressureStrategy, subscriber); + CamelSubscription sub = new CamelSubscription(workerPool, this, name, this.backpressureStrategy, subscriber); this.subscriptions.add(sub); subscriber.onSubscribe(sub); } http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java index 431ca6d..60b42a3 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java @@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Exchange; import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy; +import org.apache.camel.component.reactive.streams.ReactiveStreamsDiscardedException; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; @@ -43,6 +44,8 @@ public class CamelSubscription implements Subscription { private ExecutorService workerPool; + private String name; + private CamelPublisher publisher; private ReactiveStreamsBackpressureStrategy backpressureStrategy; @@ -78,9 +81,12 @@ public class CamelSubscription implements Subscription { private boolean sending; - public CamelSubscription(ExecutorService workerPool, CamelPublisher publisher, ReactiveStreamsBackpressureStrategy backpressureStrategy, Subscriber<? super StreamPayload<Exchange>> subscriber) { + public CamelSubscription(ExecutorService workerPool, CamelPublisher publisher, String name, + ReactiveStreamsBackpressureStrategy backpressureStrategy, + Subscriber<? super StreamPayload<Exchange>> subscriber) { this.workerPool = workerPool; this.publisher = publisher; + this.name = name; this.backpressureStrategy = backpressureStrategy; this.subscriber = subscriber; } @@ -232,7 +238,9 @@ public class CamelSubscription implements Subscription { if (discardedMessages != null) { for (Map.Entry<StreamPayload<Exchange>, String> discarded : discardedMessages.entrySet()) { StreamPayload<Exchange> m = discarded.getKey(); - m.getCallback().processed(m.getItem(), new IllegalStateException(discarded.getValue())); + Exchange exchange = m.getItem(); + ReactiveStreamsDiscardedException e = new ReactiveStreamsDiscardedException("Discarded by backpressure strategy", exchange, name); + m.getCallback().processed(exchange, e); } } http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java index b33267f..5fb6dd4 100644 --- a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.reactive.streams.springboot; import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy; +import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent; import org.springframework.boot.context.properties.ConfigurationProperties; /** @@ -35,7 +36,7 @@ public class ReactiveStreamsComponentConfiguration { * The backpressure strategy to use when pushing events to a slow * subscriber. */ - private ReactiveStreamsBackpressureStrategy backpressureStrategy; + private ReactiveStreamsBackpressureStrategy backpressureStrategy = ReactiveStreamsBackpressureStrategy.BUFFER; /** * Whether the component should resolve property placeholders on itself when * starting. Only properties which are of String type can use property