This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new a69abb0  CAMEL-14568 Fixed reactive streams
a69abb0 is described below

commit a69abb0422cba1ba254ab2faa54e10076e06bda7
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Mar 2 09:05:38 2020 +0100

    CAMEL-14568 Fixed reactive streams
---
 .../ReactiveStreamsComponentConfigurer.java        | 10 +++-
 .../reactive/streams/reactive-streams.json         |  5 +-
 .../src/main/docs/reactive-streams-component.adoc  |  7 ++-
 .../reactive/streams/ReactiveStreamsComponent.java | 62 +++++++++++++++++++---
 .../engine/ReactiveStreamsEngineConfiguration.java |  7 +--
 .../ReactiveStreamsComponentBuilderFactory.java    | 54 +++++++++++++++++--
 6 files changed, 121 insertions(+), 24 deletions(-)

diff --git 
a/components/camel-reactive-streams/src/generated/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponentConfigurer.java
 
b/components/camel-reactive-streams/src/generated/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponentConfigurer.java
index 45af20a..842a9ae 100644
--- 
a/components/camel-reactive-streams/src/generated/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponentConfigurer.java
+++ 
b/components/camel-reactive-streams/src/generated/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponentConfigurer.java
@@ -21,12 +21,18 @@ public class ReactiveStreamsComponentConfigurer extends 
PropertyConfigurerSuppor
         case "basicPropertyBinding": 
target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); 
return true;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": 
target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); 
return true;
-        case "internalengineconfiguration":
-        case "internalEngineConfiguration": 
target.setInternalEngineConfiguration(property(camelContext, 
org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration.class,
 value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
+        case "reactivestreamsengineconfiguration":
+        case "reactiveStreamsEngineConfiguration": 
target.setReactiveStreamsEngineConfiguration(property(camelContext, 
org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration.class,
 value)); return true;
         case "servicetype":
         case "serviceType": target.setServiceType(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "threadpoolmaxsize":
+        case "threadPoolMaxSize": 
target.setThreadPoolMaxSize(property(camelContext, int.class, value)); return 
true;
+        case "threadpoolminsize":
+        case "threadPoolMinSize": 
target.setThreadPoolMinSize(property(camelContext, int.class, value)); return 
true;
+        case "threadpoolname":
+        case "threadPoolName": target.setThreadPoolName(property(camelContext, 
java.lang.String.class, value)); return true;
         default: return false;
         }
     }
diff --git 
a/components/camel-reactive-streams/src/generated/resources/org/apache/camel/component/reactive/streams/reactive-streams.json
 
b/components/camel-reactive-streams/src/generated/resources/org/apache/camel/component/reactive/streams/reactive-streams.json
index 24bc99d..9bda3be 100644
--- 
a/components/camel-reactive-streams/src/generated/resources/org/apache/camel/component/reactive/streams/reactive-streams.json
+++ 
b/components/camel-reactive-streams/src/generated/resources/org/apache/camel/component/reactive/streams/reactive-streams.json
@@ -19,11 +19,14 @@
     "version": "3.2.0-SNAPSHOT"
   },
   "componentProperties": {
+    "threadPoolMaxSize": { "kind": "property", "displayName": "Thread Pool Max 
Size", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": "10", "description": "The maximum number of threads used by the 
reactive streams internal engine." },
+    "threadPoolMinSize": { "kind": "property", "displayName": "Thread Pool Min 
Size", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "secret": false, 
"description": "The minimum number of threads used by the reactive streams 
internal engine." },
+    "threadPoolName": { "kind": "property", "displayName": "Thread Pool Name", 
"group": "common", "label": "common", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "CamelReactiveStreamsWorker", "description": "The name of the 
thread pool used by the reactive streams internal engine." },
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages, or the likes, will now be 
processed as a message and handled by [...]
     "backpressureStrategy": { "kind": "property", "displayName": "Backpressure 
Strategy", "group": "producer", "label": "producer", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy",
 "enum": [ "BUFFER", "OLDEST", "LATEST" ], "deprecated": false, "secret": 
false, "defaultValue": "BUFFER", "description": "The backpressure strategy to 
use when pushing events to a slow subscriber." },
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start 
Producer", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a producer may otherwise 
fail during starting and cause the r [...]
     "basicPropertyBinding": { "kind": "property", "displayName": "Basic 
Property Binding", "group": "advanced", "label": "advanced", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, 
"defaultValue": false, "description": "Whether the component should use basic 
property binding (Camel 2.x) or the newer property binding with additional 
capabilities" },
-    "internalEngineConfiguration": { "kind": "property", "displayName": 
"Internal Engine Configuration", "group": "advanced", "label": "advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration",
 "deprecated": false, "secret": false, "description": "Configures the internal 
engine for Reactive Streams." },
+    "reactiveStreamsEngineConfiguration": { "kind": "property", "displayName": 
"Reactive Streams Engine Configuration", "group": "advanced", "label": 
"advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration",
 "deprecated": false, "secret": false, "description": "To use an existing 
reactive stream engine configuration." },
     "serviceType": { "kind": "property", "displayName": "Service Type", 
"group": "advanced", "label": "advanced", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "secret": false, 
"description": "Set the type of the underlying reactive streams implementation 
to use. The implementation is looked up from the registry or using a 
ServiceLoader, the default implementation is 
DefaultCamelReactiveStreamsService" }
   },
   "properties": {
diff --git 
a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
 
b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index 1238d82..eb337b6 100644
--- 
a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ 
b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -49,18 +49,21 @@ external stream processing systems.
 
 
 // component options: START
-The Reactive Streams component supports 6 options, which are listed below.
+The Reactive Streams component supports 9 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *threadPoolMaxSize* (common) | The maximum number of threads used by the 
reactive streams internal engine. | 10 | int
+| *threadPoolMinSize* (common) | The minimum number of threads used by the 
reactive streams internal engine. |  | int
+| *threadPoolName* (common) | The name of the thread pool used by the reactive 
streams internal engine. | CamelReactiveStreamsWorker | String
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages, or the likes, will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions, that will be logged at WARN or ERROR level and ignored. | false | 
boolean
 | *backpressureStrategy* (producer) | The backpressure strategy to use when 
pushing events to a slow subscriber. The value can be one of: BUFFER, OLDEST, 
LATEST | BUFFER | ReactiveStreamsBackpressureStrategy
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy 
(on the first message). By starting lazy you can use this to allow CamelContext 
and routes to startup in situations where a producer may otherwise fail during 
starting and cause the route to fail being started. By deferring this startup 
to be lazy then the startup failure can be handled during routing messages via 
Camel's routing error handlers. Beware that when the first message is processed 
then creating and [...]
 | *basicPropertyBinding* (advanced) | Whether the component should use basic 
property binding (Camel 2.x) or the newer property binding with additional 
capabilities | false | boolean
-| *internalEngineConfiguration* (advanced) | Configures the internal engine 
for Reactive Streams. |  | ReactiveStreamsEngineConfiguration
+| *reactiveStreamsEngine Configuration* (advanced) | To use an existing 
reactive stream engine configuration. |  | ReactiveStreamsEngineConfiguration
 | *serviceType* (advanced) | Set the type of the underlying reactive streams 
implementation to use. The implementation is looked up from the registry or 
using a ServiceLoader, the default implementation is 
DefaultCamelReactiveStreamsService |  | String
 |===
 // component options: END
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
index 417c4d4..0104033 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
@@ -32,12 +32,18 @@ import org.apache.camel.support.service.ServiceHelper;
  */
 @Component("reactive-streams")
 public class ReactiveStreamsComponent extends DefaultComponent {
-    @Metadata(label = "advanced")
-    private ReactiveStreamsEngineConfiguration internalEngineConfiguration = 
new ReactiveStreamsEngineConfiguration();
+    @Metadata(label = "common", defaultValue = "CamelReactiveStreamsWorker")
+    private String threadPoolName = "CamelReactiveStreamsWorker";
+    @Metadata(label = "common")
+    private int threadPoolMinSize;
+    @Metadata(label = "common", defaultValue = "10")
+    private int threadPoolMaxSize = 10;
     @Metadata(label = "producer", defaultValue = "BUFFER")
     private ReactiveStreamsBackpressureStrategy backpressureStrategy = 
ReactiveStreamsBackpressureStrategy.BUFFER;
     @Metadata(label = "advanced")
     private String serviceType;
+    @Metadata(label = "advanced")
+    private ReactiveStreamsEngineConfiguration 
reactiveStreamsEngineConfiguration;
 
     private CamelReactiveStreamsService service;
 
@@ -81,15 +87,15 @@ public class ReactiveStreamsComponent extends 
DefaultComponent {
     // Properties
     // ****************************************
 
-    public ReactiveStreamsEngineConfiguration getInternalEngineConfiguration() 
{
-        return internalEngineConfiguration;
+    public ReactiveStreamsEngineConfiguration 
getReactiveStreamsEngineConfiguration() {
+        return reactiveStreamsEngineConfiguration;
     }
 
     /**
-     * Configures the internal engine for Reactive Streams.
+     * To use an existing reactive stream engine configuration.
      */
-    public void 
setInternalEngineConfiguration(ReactiveStreamsEngineConfiguration 
internalEngineConfiguration) {
-        this.internalEngineConfiguration = internalEngineConfiguration;
+    public void 
setReactiveStreamsEngineConfiguration(ReactiveStreamsEngineConfiguration 
reactiveStreamsEngineConfiguration) {
+        this.reactiveStreamsEngineConfiguration = 
reactiveStreamsEngineConfiguration;
     }
 
     public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
@@ -118,17 +124,57 @@ public class ReactiveStreamsComponent extends 
DefaultComponent {
         this.serviceType = serviceType;
     }
 
+    public String getThreadPoolName() {
+        return threadPoolName;
+    }
+
+    /**
+     * The name of the thread pool used by the reactive streams internal 
engine.
+     */
+    public void setThreadPoolName(String threadPoolName) {
+        this.threadPoolName = threadPoolName;
+    }
+
+    public int getThreadPoolMinSize() {
+        return threadPoolMinSize;
+    }
+
+    /**
+     * The minimum number of threads used by the reactive streams internal 
engine.
+     */
+    public void setThreadPoolMinSize(int threadPoolMinSize) {
+        this.threadPoolMinSize = threadPoolMinSize;
+    }
+
+    public int getThreadPoolMaxSize() {
+        return threadPoolMaxSize;
+    }
+
+    /**
+     * The maximum number of threads used by the reactive streams internal 
engine.
+     */
+    public void setThreadPoolMaxSize(int threadPoolMaxSize) {
+        this.threadPoolMaxSize = threadPoolMaxSize;
+    }
+
     /**
      * Lazy creation of the CamelReactiveStreamsService
      *
      * @return the reactive streams service
      */
     public synchronized CamelReactiveStreamsService 
getReactiveStreamsService() {
+        if (reactiveStreamsEngineConfiguration == null) {
+            reactiveStreamsEngineConfiguration = new 
ReactiveStreamsEngineConfiguration();
+            
reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize);
+            
reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize);
+            
reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName);
+        }
+
         if (service == null) {
             this.service = ReactiveStreamsHelper.resolveReactiveStreamsService(
                 getCamelContext(),
                 this.serviceType,
-                this.internalEngineConfiguration
+                this.reactiveStreamsEngineConfiguration
             );
 
             try {
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/ReactiveStreamsEngineConfiguration.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/ReactiveStreamsEngineConfiguration.java
index 0418f10..53af7c5 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/ReactiveStreamsEngineConfiguration.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/ReactiveStreamsEngineConfiguration.java
@@ -16,18 +16,13 @@
  */
 package org.apache.camel.component.reactive.streams.engine;
 
-import org.apache.camel.spi.Metadata;
-
 /**
  * Configuration parameters for the Camel internal reactive-streams engine.
  */
-public class ReactiveStreamsEngineConfiguration implements Cloneable {
+public class ReactiveStreamsEngineConfiguration {
 
-    @Metadata(defaultValue = "CamelReactiveStreamsWorker", description = "The 
name of the thread pool used by the reactive streams internal engine.")
     private String threadPoolName = "CamelReactiveStreamsWorker";
-    @Metadata(description = "The minimum number of threads used by the 
reactive streams internal engine.")
     private int threadPoolMinSize;
-    @Metadata(defaultValue = "10", description = "The maximum number of 
threads used by the reactive streams internal engine.")
     private int threadPoolMaxSize = 10;
 
     public ReactiveStreamsEngineConfiguration() {
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ReactiveStreamsComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ReactiveStreamsComponentBuilderFactory.java
index f3cfae5..7f27341 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ReactiveStreamsComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ReactiveStreamsComponentBuilderFactory.java
@@ -49,6 +49,47 @@ public interface ReactiveStreamsComponentBuilderFactory {
             extends
                 ComponentBuilder<ReactiveStreamsComponent> {
         /**
+         * The maximum number of threads used by the reactive streams internal
+         * engine.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 10
+         * Group: common
+         */
+        default ReactiveStreamsComponentBuilder threadPoolMaxSize(
+                int threadPoolMaxSize) {
+            doSetProperty("threadPoolMaxSize", threadPoolMaxSize);
+            return this;
+        }
+        /**
+         * The minimum number of threads used by the reactive streams internal
+         * engine.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Group: common
+         */
+        default ReactiveStreamsComponentBuilder threadPoolMinSize(
+                int threadPoolMinSize) {
+            doSetProperty("threadPoolMinSize", threadPoolMinSize);
+            return this;
+        }
+        /**
+         * The name of the thread pool used by the reactive streams internal
+         * engine.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Default: CamelReactiveStreamsWorker
+         * Group: common
+         */
+        default ReactiveStreamsComponentBuilder threadPoolName(
+                java.lang.String threadPoolName) {
+            doSetProperty("threadPoolName", threadPoolName);
+            return this;
+        }
+        /**
          * Allows for bridging the consumer to the Camel routing Error Handler,
          * which mean any exceptions occurred while the consumer is trying to
          * pickup incoming messages, or the likes, will now be processed as a
@@ -118,16 +159,16 @@ public interface ReactiveStreamsComponentBuilderFactory {
             return this;
         }
         /**
-         * Configures the internal engine for Reactive Streams.
+         * To use an existing reactive stream engine configuration.
          * 
          * The option is a:
          * 
<code>org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration</code>
 type.
          * 
          * Group: advanced
          */
-        default ReactiveStreamsComponentBuilder internalEngineConfiguration(
-                
org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration
 internalEngineConfiguration) {
-            doSetProperty("internalEngineConfiguration", 
internalEngineConfiguration);
+        default ReactiveStreamsComponentBuilder 
reactiveStreamsEngineConfiguration(
+                
org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration
 reactiveStreamsEngineConfiguration) {
+            doSetProperty("reactiveStreamsEngineConfiguration", 
reactiveStreamsEngineConfiguration);
             return this;
         }
         /**
@@ -162,11 +203,14 @@ public interface ReactiveStreamsComponentBuilderFactory {
                 String name,
                 Object value) {
             switch (name) {
+            case "threadPoolMaxSize": ((ReactiveStreamsComponent) 
component).setThreadPoolMaxSize((int) value); return true;
+            case "threadPoolMinSize": ((ReactiveStreamsComponent) 
component).setThreadPoolMinSize((int) value); return true;
+            case "threadPoolName": ((ReactiveStreamsComponent) 
component).setThreadPoolName((java.lang.String) value); return true;
             case "bridgeErrorHandler": ((ReactiveStreamsComponent) 
component).setBridgeErrorHandler((boolean) value); return true;
             case "backpressureStrategy": ((ReactiveStreamsComponent) 
component).setBackpressureStrategy((org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy)
 value); return true;
             case "lazyStartProducer": ((ReactiveStreamsComponent) 
component).setLazyStartProducer((boolean) value); return true;
             case "basicPropertyBinding": ((ReactiveStreamsComponent) 
component).setBasicPropertyBinding((boolean) value); return true;
-            case "internalEngineConfiguration": ((ReactiveStreamsComponent) 
component).setInternalEngineConfiguration((org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration)
 value); return true;
+            case "reactiveStreamsEngineConfiguration": 
((ReactiveStreamsComponent) 
component).setReactiveStreamsEngineConfiguration((org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration)
 value); return true;
             case "serviceType": ((ReactiveStreamsComponent) 
component).setServiceType((java.lang.String) value); return true;
             default: return false;
             }

Reply via email to