This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 6ea3733a3c6 CAMEL-16483: camel-nats: Add jetstream support 6ea3733a3c6 is described below commit 6ea3733a3c66be708d659e04b2d66a7298eb494b Author: Vineet Saurabh <vineetsaur...@gmail.com> AuthorDate: Wed Jun 4 07:45:12 2025 +0200 CAMEL-16483: camel-nats: Add jetstream support * CAMEL-16483 Producer with Jetstream * CAMEL-16483 Support for endpoints for new parameters * CAMEL-16483 Fix subject overlap error * NATS Jetstream consumer Example Route: from("nats://my.topic?" + "jetstreamEnabled=true") // Subscribe to the 'my.topic' subject .process(exchange -> { String message = exchange.getIn().getBody(String.class); System.out.println("Received message on 'my.topic': " + message); }); Send Message through CLI: nats pub my.topic "Hello from NATS on my.topic" * Camel 16483 Fix from comments on PR #18212 * Camel 16483 remove * from imports * Camel 16483 Jetstream default disable * Camel 16483 Publish method for Jetstream * Breaking publish up into 2 methods, so one is jetstream and the other is non jetstream and as-is today * json and NatsEndpointConfigurer after mvn clean install -Dquickly * Camel 16483 coding standards and style run mvn clean install -DskipTests locally from root folder and I have committed all auto-generated changes * CAMEL-16483 Consumer refactoring and minor changes * Camel 16483 Exception handling --------- Co-authored-by: Vineet Saurabh <vineet.saur...@bbraun.com> Co-authored-by: Claus Ibsen <claus.ib...@gmail.com> --- .../org/apache/camel/catalog/components/nats.json | 57 +++--- .../component/nats/NatsEndpointConfigurer.java | 18 ++ .../component/nats/NatsEndpointUriFactory.java | 5 +- .../org/apache/camel/component/nats/nats.json | 57 +++--- .../camel/component/nats/NatsConfiguration.java | 49 +++++ .../apache/camel/component/nats/NatsConsumer.java | 105 +++++++--- .../apache/camel/component/nats/NatsProducer.java | 91 ++++++++- .../endpoint/dsl/NatsEndpointBuilderFactory.java | 222 +++++++++++++++++++++ 8 files changed, 515 insertions(+), 89 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json index 4edd4ca24f9..4878c6d3362 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json @@ -46,32 +46,35 @@ "connectionTimeout": { "index": 1, "kind": "parameter", "displayName": "Connection Timeout", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Timeout for connection attempts. (in milliseconds)" }, "flushConnection": { "index": 2, "kind": "parameter", "displayName": "Flush Connection", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Define if we want to flush connection when stopping or not" }, "flushTimeout": { "index": 3, "kind": "parameter", "displayName": "Flush Timeout", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set the flush timeout (in milliseconds)" }, - "maxPingsOut": { "index": 4, "kind": "parameter", "displayName": "Max Pings Out", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "maximum number of pings have not received a response allowed by the client" }, - "maxReconnectAttempts": { "index": 5, "kind": "parameter", "displayName": "Max Reconnect Attempts", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Max reconnection attempts" }, - "noEcho": { "index": 6, "kind": "parameter", "displayName": "No Echo", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Turn off echo. If supported by the gnatsd version you are connecting to this flag will prevent the server from ec [...] - "noRandomizeServers": { "index": 7, "kind": "parameter", "displayName": "No Randomize Servers", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not randomizing the order of servers for the connection attempts" }, - "pedantic": { "index": 8, "kind": "parameter", "displayName": "Pedantic", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in pedantic mode (this affects performance)" }, - "pingInterval": { "index": 9, "kind": "parameter", "displayName": "Ping Interval", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 120000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Ping interval to be aware if connection is still alive (in milliseconds)" }, - "reconnect": { "index": 10, "kind": "parameter", "displayName": "Reconnect", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not using reconnection feature" }, - "reconnectTimeWait": { "index": 11, "kind": "parameter", "displayName": "Reconnect Time Wait", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Waiting time before attempts reconnection (in milliseconds)" }, - "requestCleanupInterval": { "index": 12, "kind": "parameter", "displayName": "Request Cleanup Interval", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Interval to clean up cancelled\/timed out requests." }, - "servers": { "index": 13, "kind": "parameter", "displayName": "Servers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers." }, - "verbose": { "index": 14, "kind": "parameter", "displayName": "Verbose", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in verbose mode" }, - "maxMessages": { "index": 15, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, - "poolSize": { "index": 16, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, - "queueName": { "index": 17, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, - "replyToDisabled": { "index": 18, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, - "bridgeErrorHandler": { "index": 19, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] - "exceptionHandler": { "index": 20, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] - "exchangePattern": { "index": 21, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "replySubject": { "index": 22, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, - "requestTimeout": { "index": 23, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, - "lazyStartProducer": { "index": 24, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "connection": { "index": 25, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, - "headerFilterStrategy": { "index": 26, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, - "traceConnection": { "index": 27, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine [...] - "credentialsFilePath": { "index": 28, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It [...] - "secure": { "index": 29, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, - "sslContextParameters": { "index": 30, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } + "jetstreamEnabled": { "index": 4, "kind": "parameter", "displayName": "Jetstream Enabled", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to enable JetStream support for this endpoint." }, + "jetstreamName": { "index": 5, "kind": "parameter", "displayName": "Jetstream Name", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name of the JetStream stream to use." }, + "maxPingsOut": { "index": 6, "kind": "parameter", "displayName": "Max Pings Out", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "maximum number of pings have not received a response allowed by the client" }, + "maxReconnectAttempts": { "index": 7, "kind": "parameter", "displayName": "Max Reconnect Attempts", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Max reconnection attempts" }, + "noEcho": { "index": 8, "kind": "parameter", "displayName": "No Echo", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Turn off echo. If supported by the gnatsd version you are connecting to this flag will prevent the server from ec [...] + "noRandomizeServers": { "index": 9, "kind": "parameter", "displayName": "No Randomize Servers", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not randomizing the order of servers for the connection attempts" }, + "pedantic": { "index": 10, "kind": "parameter", "displayName": "Pedantic", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in pedantic mode (this affects performance)" }, + "pingInterval": { "index": 11, "kind": "parameter", "displayName": "Ping Interval", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 120000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Ping interval to be aware if connection is still alive (in milliseconds)" }, + "reconnect": { "index": 12, "kind": "parameter", "displayName": "Reconnect", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not using reconnection feature" }, + "reconnectTimeWait": { "index": 13, "kind": "parameter", "displayName": "Reconnect Time Wait", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Waiting time before attempts reconnection (in milliseconds)" }, + "requestCleanupInterval": { "index": 14, "kind": "parameter", "displayName": "Request Cleanup Interval", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Interval to clean up cancelled\/timed out requests." }, + "servers": { "index": 15, "kind": "parameter", "displayName": "Servers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers." }, + "verbose": { "index": 16, "kind": "parameter", "displayName": "Verbose", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in verbose mode" }, + "maxMessages": { "index": 17, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, + "poolSize": { "index": 18, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, + "queueName": { "index": 19, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, + "replyToDisabled": { "index": 20, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, + "bridgeErrorHandler": { "index": 21, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] + "exceptionHandler": { "index": 22, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] + "exchangePattern": { "index": 23, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "replySubject": { "index": 24, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, + "requestTimeout": { "index": 25, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, + "lazyStartProducer": { "index": 26, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "connection": { "index": 27, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, + "headerFilterStrategy": { "index": 28, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, + "jetstreamAsync": { "index": 29, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, + "traceConnection": { "index": 30, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine [...] + "credentialsFilePath": { "index": 31, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It [...] + "secure": { "index": 32, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, + "sslContextParameters": { "index": 33, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } } } diff --git a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java index 223199a3713..aebff95666e 100644 --- a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java +++ b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java @@ -40,6 +40,12 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements case "flushTimeout": target.getConfiguration().setFlushTimeout(property(camelContext, int.class, value)); return true; case "headerfilterstrategy": case "headerFilterStrategy": target.getConfiguration().setHeaderFilterStrategy(property(camelContext, org.apache.camel.spi.HeaderFilterStrategy.class, value)); return true; + case "jetstreamasync": + case "jetstreamAsync": target.getConfiguration().setJetstreamAsync(property(camelContext, boolean.class, value)); return true; + case "jetstreamenabled": + case "jetstreamEnabled": target.getConfiguration().setJetstreamEnabled(property(camelContext, boolean.class, value)); return true; + case "jetstreamname": + case "jetstreamName": target.getConfiguration().setJetstreamName(property(camelContext, java.lang.String.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; case "maxmessages": @@ -101,6 +107,12 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements case "flushTimeout": return int.class; case "headerfilterstrategy": case "headerFilterStrategy": return org.apache.camel.spi.HeaderFilterStrategy.class; + case "jetstreamasync": + case "jetstreamAsync": return boolean.class; + case "jetstreamenabled": + case "jetstreamEnabled": return boolean.class; + case "jetstreamname": + case "jetstreamName": return java.lang.String.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; case "maxmessages": @@ -163,6 +175,12 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements case "flushTimeout": return target.getConfiguration().getFlushTimeout(); case "headerfilterstrategy": case "headerFilterStrategy": return target.getConfiguration().getHeaderFilterStrategy(); + case "jetstreamasync": + case "jetstreamAsync": return target.getConfiguration().isJetstreamAsync(); + case "jetstreamenabled": + case "jetstreamEnabled": return target.getConfiguration().isJetstreamEnabled(); + case "jetstreamname": + case "jetstreamName": return target.getConfiguration().getJetstreamName(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); case "maxmessages": diff --git a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java index 793f592a818..064c5610f7b 100644 --- a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java +++ b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java @@ -23,7 +23,7 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(31); + Set<String> props = new HashSet<>(34); props.add("bridgeErrorHandler"); props.add("connection"); props.add("connectionTimeout"); @@ -33,6 +33,9 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E props.add("flushConnection"); props.add("flushTimeout"); props.add("headerFilterStrategy"); + props.add("jetstreamAsync"); + props.add("jetstreamEnabled"); + props.add("jetstreamName"); props.add("lazyStartProducer"); props.add("maxMessages"); props.add("maxPingsOut"); diff --git a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json index 4edd4ca24f9..4878c6d3362 100644 --- a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json +++ b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json @@ -46,32 +46,35 @@ "connectionTimeout": { "index": 1, "kind": "parameter", "displayName": "Connection Timeout", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Timeout for connection attempts. (in milliseconds)" }, "flushConnection": { "index": 2, "kind": "parameter", "displayName": "Flush Connection", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Define if we want to flush connection when stopping or not" }, "flushTimeout": { "index": 3, "kind": "parameter", "displayName": "Flush Timeout", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set the flush timeout (in milliseconds)" }, - "maxPingsOut": { "index": 4, "kind": "parameter", "displayName": "Max Pings Out", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "maximum number of pings have not received a response allowed by the client" }, - "maxReconnectAttempts": { "index": 5, "kind": "parameter", "displayName": "Max Reconnect Attempts", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Max reconnection attempts" }, - "noEcho": { "index": 6, "kind": "parameter", "displayName": "No Echo", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Turn off echo. If supported by the gnatsd version you are connecting to this flag will prevent the server from ec [...] - "noRandomizeServers": { "index": 7, "kind": "parameter", "displayName": "No Randomize Servers", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not randomizing the order of servers for the connection attempts" }, - "pedantic": { "index": 8, "kind": "parameter", "displayName": "Pedantic", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in pedantic mode (this affects performance)" }, - "pingInterval": { "index": 9, "kind": "parameter", "displayName": "Ping Interval", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 120000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Ping interval to be aware if connection is still alive (in milliseconds)" }, - "reconnect": { "index": 10, "kind": "parameter", "displayName": "Reconnect", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not using reconnection feature" }, - "reconnectTimeWait": { "index": 11, "kind": "parameter", "displayName": "Reconnect Time Wait", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Waiting time before attempts reconnection (in milliseconds)" }, - "requestCleanupInterval": { "index": 12, "kind": "parameter", "displayName": "Request Cleanup Interval", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Interval to clean up cancelled\/timed out requests." }, - "servers": { "index": 13, "kind": "parameter", "displayName": "Servers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers." }, - "verbose": { "index": 14, "kind": "parameter", "displayName": "Verbose", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in verbose mode" }, - "maxMessages": { "index": 15, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, - "poolSize": { "index": 16, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, - "queueName": { "index": 17, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, - "replyToDisabled": { "index": 18, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, - "bridgeErrorHandler": { "index": 19, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] - "exceptionHandler": { "index": 20, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] - "exchangePattern": { "index": 21, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "replySubject": { "index": 22, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, - "requestTimeout": { "index": 23, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, - "lazyStartProducer": { "index": 24, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "connection": { "index": 25, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, - "headerFilterStrategy": { "index": 26, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, - "traceConnection": { "index": 27, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine [...] - "credentialsFilePath": { "index": 28, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It [...] - "secure": { "index": 29, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, - "sslContextParameters": { "index": 30, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } + "jetstreamEnabled": { "index": 4, "kind": "parameter", "displayName": "Jetstream Enabled", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to enable JetStream support for this endpoint." }, + "jetstreamName": { "index": 5, "kind": "parameter", "displayName": "Jetstream Name", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name of the JetStream stream to use." }, + "maxPingsOut": { "index": 6, "kind": "parameter", "displayName": "Max Pings Out", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "maximum number of pings have not received a response allowed by the client" }, + "maxReconnectAttempts": { "index": 7, "kind": "parameter", "displayName": "Max Reconnect Attempts", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Max reconnection attempts" }, + "noEcho": { "index": 8, "kind": "parameter", "displayName": "No Echo", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Turn off echo. If supported by the gnatsd version you are connecting to this flag will prevent the server from ec [...] + "noRandomizeServers": { "index": 9, "kind": "parameter", "displayName": "No Randomize Servers", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not randomizing the order of servers for the connection attempts" }, + "pedantic": { "index": 10, "kind": "parameter", "displayName": "Pedantic", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in pedantic mode (this affects performance)" }, + "pingInterval": { "index": 11, "kind": "parameter", "displayName": "Ping Interval", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 120000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Ping interval to be aware if connection is still alive (in milliseconds)" }, + "reconnect": { "index": 12, "kind": "parameter", "displayName": "Reconnect", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not using reconnection feature" }, + "reconnectTimeWait": { "index": 13, "kind": "parameter", "displayName": "Reconnect Time Wait", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Waiting time before attempts reconnection (in milliseconds)" }, + "requestCleanupInterval": { "index": 14, "kind": "parameter", "displayName": "Request Cleanup Interval", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Interval to clean up cancelled\/timed out requests." }, + "servers": { "index": 15, "kind": "parameter", "displayName": "Servers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers." }, + "verbose": { "index": 16, "kind": "parameter", "displayName": "Verbose", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in verbose mode" }, + "maxMessages": { "index": 17, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, + "poolSize": { "index": 18, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, + "queueName": { "index": 19, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, + "replyToDisabled": { "index": 20, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, + "bridgeErrorHandler": { "index": 21, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] + "exceptionHandler": { "index": 22, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] + "exchangePattern": { "index": 23, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "replySubject": { "index": 24, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, + "requestTimeout": { "index": 25, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, + "lazyStartProducer": { "index": 26, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "connection": { "index": 27, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, + "headerFilterStrategy": { "index": 28, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, + "jetstreamAsync": { "index": 29, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, + "traceConnection": { "index": 30, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine [...] + "credentialsFilePath": { "index": 31, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It [...] + "secure": { "index": 32, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, + "sslContextParameters": { "index": 33, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } } } diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java index b41fa518c34..1f9d0d3fded 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java @@ -90,6 +90,12 @@ public class NatsConfiguration { private boolean traceConnection; @UriParam(label = "advanced") private HeaderFilterStrategy headerFilterStrategy = new DefaultHeaderFilterStrategy(); + @UriParam(label = "common", defaultValue = "false") + private boolean jetstreamEnabled = false; + @UriParam(label = "common") + private String jetstreamName; + @UriParam(label = "advanced", defaultValue = "true") + private boolean jetstreamAsync = true; /** * URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers. @@ -454,4 +460,47 @@ public class NatsConfiguration { public void setTraceConnection(boolean traceConnection) { this.traceConnection = traceConnection; } + + /** + * Whether to enable JetStream support for this endpoint. + */ + public boolean isJetstreamEnabled() { + return jetstreamEnabled; + } + + /** + * Sets whether to enable JetStream support for this endpoint. + */ + public void setJetstreamEnabled(boolean jetstreamEnabled) { + this.jetstreamEnabled = jetstreamEnabled; + } + + /** + * The name of the JetStream stream to use. If not specified, a default stream will be used if JetStream is enabled. + */ + public String getJetstreamName() { + return jetstreamName; + } + + /** + * Sets the name of the JetStream stream to use. + */ + public void setJetstreamName(String jetstreamName) { + this.jetstreamName = jetstreamName; + } + + /** + * Whether to operate JetStream requests asynchronously. This can improve performance but requires careful handling + * of results. + */ + public boolean isJetstreamAsync() { + return jetstreamAsync; + } + + /** + * Sets whether to operate JetStream requests asynchronously. + */ + public void setJetstreamAsync(boolean jetstreamAsync) { + this.jetstreamAsync = jetstreamAsync; + } } diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index 31ff448a852..5bfaa962960 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -16,14 +16,21 @@ */ package org.apache.camel.component.nats; +import java.io.IOException; import java.time.Duration; import java.util.concurrent.ExecutorService; import io.nats.client.Connection; import io.nats.client.Connection.Status; import io.nats.client.Dispatcher; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.JetStreamSubscription; import io.nats.client.Message; import io.nats.client.MessageHandler; +import io.nats.client.PushSubscribeOptions; +import io.nats.client.api.ConsumerConfiguration; +import io.nats.client.api.StreamConfiguration; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.spi.HeaderFilterStrategy; @@ -41,6 +48,7 @@ public class NatsConsumer extends DefaultConsumer { private Connection connection; private Dispatcher dispatcher; private boolean active; + private JetStreamSubscription jetStreamSubscription; public NatsConsumer(NatsEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -123,30 +131,19 @@ public class NatsConsumer extends DefaultConsumer { @Override public void run() { try { - NatsConsumer.this.dispatcher = this.connection.createDispatcher(new CamelNatsMessageHandler()); - if (ObjectHelper.isNotEmpty(this.configuration.getQueueName())) { - NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - NatsConsumer.this.getEndpoint().getConfiguration().getQueueName()); - if (ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())) { - NatsConsumer.this.dispatcher.unsubscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())); - } - if (NatsConsumer.this.dispatcher.isActive()) { - NatsConsumer.this.setActive(true); - } + NatsConfiguration config = getEndpoint().getConfiguration(); + String topic = config.getTopic(); + String queueName = config.getQueueName(); + String maxMessagesStr = config.getMaxMessages(); + Integer maxMessages = null; + if (ObjectHelper.isNotEmpty(maxMessagesStr)) { + maxMessages = Integer.parseInt(maxMessagesStr); + } + + if (config.isJetstreamEnabled() && connection.getServerInfo().isJetStreamAvailable()) { + setupJetStreamConsumer(topic, queueName); } else { - NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher - .subscribe(NatsConsumer.this.getEndpoint().getConfiguration().getTopic()); - if (ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())) { - NatsConsumer.this.dispatcher.unsubscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())); - } - if (NatsConsumer.this.dispatcher.isActive()) { - NatsConsumer.this.setActive(true); - } + setupStandardNatsConsumer(topic, queueName, maxMessages); } } catch (final Exception e) { NatsConsumer.this.getExceptionHandler().handleException("Error during processing", e); @@ -154,6 +151,68 @@ public class NatsConsumer extends DefaultConsumer { } + private void setupJetStreamConsumer(String topic, String queueName) throws IOException, JetStreamApiException { + String streamName = this.configuration.getJetstreamName(); + String consumerName + = ObjectHelper.isNotEmpty(queueName) ? queueName : "consumer-" + System.currentTimeMillis(); // Generate a default consumer name if queueName is not provided + LOG.debug("Setting up JetStream PUSH consumer for stream: '{}', durable: '{}', topic: {} ", streamName, + consumerName, this.configuration.getTopic()); + + JetStreamManagement jsm = connection.jetStreamManagement(); + try { + // Try to get the stream, create it if it doesn't exist + jsm.getStreamInfo(streamName); + } catch (JetStreamApiException e) { + if (e.getErrorCode() == 404) { + StreamConfiguration streamConfig = StreamConfiguration.builder() + .name(streamName) + .subjects(topic) + .build(); + jsm.addStream(streamConfig); + } else { + throw e; + } + } + + ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder() + .durable(consumerName); + + ccBuilder.deliverSubject(null); + ConsumerConfiguration cc = ccBuilder.build(); + + PushSubscribeOptions pushOptions = PushSubscribeOptions.builder() + .configuration(cc) + .build(); + + NatsConsumer.this.dispatcher = this.connection.createDispatcher(new CamelNatsMessageHandler()); + + NatsConsumer.this.jetStreamSubscription = this.connection.jetStream().subscribe( + NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), + queueName, + dispatcher, + new CamelNatsMessageHandler(), + true, + pushOptions); + + NatsConsumer.this.setActive(true); + } + + private void setupStandardNatsConsumer(String topic, String queueName, Integer maxMessages) { + LOG.debug("Setting up standard NATS consumer for topic: {}", topic); + NatsConsumer.this.dispatcher = connection.createDispatcher(new CamelNatsMessageHandler()); + if (ObjectHelper.isNotEmpty(queueName)) { + NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe(topic, queueName); + } else { + NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe(topic); + } + if (maxMessages != null) { + NatsConsumer.this.dispatcher.unsubscribe(topic, maxMessages); + } + if (NatsConsumer.this.dispatcher.isActive()) { + NatsConsumer.this.setActive(true); + } + } + class CamelNatsMessageHandler implements MessageHandler { @Override diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java index a672763d127..3fc92ceef7c 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java @@ -16,15 +16,23 @@ */ package org.apache.camel.component.nats; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import io.nats.client.Connection; import io.nats.client.Connection.Status; +import io.nats.client.JetStream; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; import io.nats.client.Message; +import io.nats.client.api.PublishAck; +import io.nats.client.api.StreamConfiguration; import io.nats.client.impl.Headers; import io.nats.client.impl.NatsMessage; import org.apache.camel.AsyncCallback; @@ -104,18 +112,18 @@ public class NatsProducer extends DefaultAsyncProducer { }); return false; } else { - LOG.debug("Publishing to topic: {}", config.getTopic()); - - final NatsMessage.Builder builder = NatsMessage.builder() - .data(body) - .subject(config.getTopic()) - .headers(this.buildHeaders(exchange)); - - if (ObjectHelper.isNotEmpty(config.getReplySubject())) { - final String replySubject = config.getReplySubject(); - builder.replyTo(replySubject); + LOG.debug("Publishing to subject: {}", config.getTopic()); + try { + if (config.isJetstreamEnabled() && this.connection.getServerInfo().isJetStreamAvailable()) { + publishWithJetStream(config, body, exchange); + } else { + publishWithoutJetStream(config, body, exchange); + } + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; } - this.connection.publish(builder.build()); callback.done(true); return true; } @@ -193,4 +201,65 @@ public class NatsProducer extends DefaultAsyncProducer { super.doStop(); } + private void publishWithJetStream(NatsConfiguration config, final byte[] body, final Exchange exchange) throws Exception { + LOG.debug("JetStream is available"); + JetStreamManagement jsm; + JetStream js; + try { + jsm = this.connection.jetStreamManagement(); + js = jsm.jetStream(); + if (js == null) { + jsm.addStream(StreamConfiguration.builder() + .name(config.getJetstreamName()) + .subjects(config.getTopic()) + .build()); + js = jsm.jetStream(); + } + } catch (IOException | JetStreamApiException e) { + throw new Exception("Failed to initialize JetStream: " + e.getMessage(), e); + } + + final NatsMessage.Builder builder = NatsMessage.builder() + .data(body) + .subject(config.getTopic()) + .headers(this.buildHeaders(exchange)); + + if (ObjectHelper.isNotEmpty(config.getReplySubject())) { + final String replySubject = config.getReplySubject(); + builder.replyTo(replySubject); + } + final Message jetStreamMessage = builder.build(); + + PublishAck pa; + if (config.isJetstreamAsync()) { + CompletableFuture<PublishAck> future = js.publishAsync(jetStreamMessage); + try { + pa = future.get(config.getRequestTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new Exception("Failed to publish message asynchronously with JetStream: " + e.getMessage(), e); + } + LOG.debug("Publish Sequence async: {}", pa.getSeqno()); + } else { + try { + pa = js.publish(jetStreamMessage); + } catch (IOException | JetStreamApiException e) { + throw new Exception("Failed to publish message synchronously with JetStream: " + e.getMessage(), e); + } + LOG.debug("Publish Sequence synchronously: {}", pa.getSeqno()); + } + } + + private void publishWithoutJetStream(NatsConfiguration config, final byte[] body, final Exchange exchange) { + final NatsMessage.Builder builder = NatsMessage.builder() + .data(body) + .subject(config.getTopic()) + .headers(this.buildHeaders(exchange)); + + if (ObjectHelper.isNotEmpty(config.getReplySubject())) { + final String replySubject = config.getReplySubject(); + builder.replyTo(replySubject); + } + this.connection.publish(builder.build()); + } + } diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java index 4b8fde899ac..8d49ad500d5 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java @@ -133,6 +133,50 @@ public interface NatsEndpointBuilderFactory { doSetProperty("flushTimeout", flushTimeout); return this; } + /** + * Sets whether to enable JetStream support for this endpoint. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: common + * + * @param jetstreamEnabled the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder jetstreamEnabled(boolean jetstreamEnabled) { + doSetProperty("jetstreamEnabled", jetstreamEnabled); + return this; + } + /** + * Sets whether to enable JetStream support for this endpoint. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: common + * + * @param jetstreamEnabled the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder jetstreamEnabled(String jetstreamEnabled) { + doSetProperty("jetstreamEnabled", jetstreamEnabled); + return this; + } + /** + * Sets the name of the JetStream stream to use. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: common + * + * @param jetstreamName the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder jetstreamName(String jetstreamName) { + doSetProperty("jetstreamName", jetstreamName); + return this; + } /** * maximum number of pings have not received a response allowed by the * client. @@ -813,6 +857,36 @@ public interface NatsEndpointBuilderFactory { doSetProperty("headerFilterStrategy", headerFilterStrategy); return this; } + /** + * Sets whether to operate JetStream requests asynchronously. + * + * The option is a: <code>boolean</code> type. + * + * Default: true + * Group: advanced + * + * @param jetstreamAsync the value to set + * @return the dsl builder + */ + default AdvancedNatsEndpointConsumerBuilder jetstreamAsync(boolean jetstreamAsync) { + doSetProperty("jetstreamAsync", jetstreamAsync); + return this; + } + /** + * Sets whether to operate JetStream requests asynchronously. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: true + * Group: advanced + * + * @param jetstreamAsync the value to set + * @return the dsl builder + */ + default AdvancedNatsEndpointConsumerBuilder jetstreamAsync(String jetstreamAsync) { + doSetProperty("jetstreamAsync", jetstreamAsync); + return this; + } /** * Whether or not connection trace messages should be printed to * standard out for fine grained debugging of connection issues. @@ -947,6 +1021,50 @@ public interface NatsEndpointBuilderFactory { doSetProperty("flushTimeout", flushTimeout); return this; } + /** + * Sets whether to enable JetStream support for this endpoint. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: common + * + * @param jetstreamEnabled the value to set + * @return the dsl builder + */ + default NatsEndpointProducerBuilder jetstreamEnabled(boolean jetstreamEnabled) { + doSetProperty("jetstreamEnabled", jetstreamEnabled); + return this; + } + /** + * Sets whether to enable JetStream support for this endpoint. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: common + * + * @param jetstreamEnabled the value to set + * @return the dsl builder + */ + default NatsEndpointProducerBuilder jetstreamEnabled(String jetstreamEnabled) { + doSetProperty("jetstreamEnabled", jetstreamEnabled); + return this; + } + /** + * Sets the name of the JetStream stream to use. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: common + * + * @param jetstreamName the value to set + * @return the dsl builder + */ + default NatsEndpointProducerBuilder jetstreamName(String jetstreamName) { + doSetProperty("jetstreamName", jetstreamName); + return this; + } /** * maximum number of pings have not received a response allowed by the * client. @@ -1510,6 +1628,36 @@ public interface NatsEndpointBuilderFactory { doSetProperty("headerFilterStrategy", headerFilterStrategy); return this; } + /** + * Sets whether to operate JetStream requests asynchronously. + * + * The option is a: <code>boolean</code> type. + * + * Default: true + * Group: advanced + * + * @param jetstreamAsync the value to set + * @return the dsl builder + */ + default AdvancedNatsEndpointProducerBuilder jetstreamAsync(boolean jetstreamAsync) { + doSetProperty("jetstreamAsync", jetstreamAsync); + return this; + } + /** + * Sets whether to operate JetStream requests asynchronously. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: true + * Group: advanced + * + * @param jetstreamAsync the value to set + * @return the dsl builder + */ + default AdvancedNatsEndpointProducerBuilder jetstreamAsync(String jetstreamAsync) { + doSetProperty("jetstreamAsync", jetstreamAsync); + return this; + } /** * Whether or not connection trace messages should be printed to * standard out for fine grained debugging of connection issues. @@ -1645,6 +1793,50 @@ public interface NatsEndpointBuilderFactory { doSetProperty("flushTimeout", flushTimeout); return this; } + /** + * Sets whether to enable JetStream support for this endpoint. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: common + * + * @param jetstreamEnabled the value to set + * @return the dsl builder + */ + default NatsEndpointBuilder jetstreamEnabled(boolean jetstreamEnabled) { + doSetProperty("jetstreamEnabled", jetstreamEnabled); + return this; + } + /** + * Sets whether to enable JetStream support for this endpoint. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: common + * + * @param jetstreamEnabled the value to set + * @return the dsl builder + */ + default NatsEndpointBuilder jetstreamEnabled(String jetstreamEnabled) { + doSetProperty("jetstreamEnabled", jetstreamEnabled); + return this; + } + /** + * Sets the name of the JetStream stream to use. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: common + * + * @param jetstreamName the value to set + * @return the dsl builder + */ + default NatsEndpointBuilder jetstreamName(String jetstreamName) { + doSetProperty("jetstreamName", jetstreamName); + return this; + } /** * maximum number of pings have not received a response allowed by the * client. @@ -2121,6 +2313,36 @@ public interface NatsEndpointBuilderFactory { doSetProperty("headerFilterStrategy", headerFilterStrategy); return this; } + /** + * Sets whether to operate JetStream requests asynchronously. + * + * The option is a: <code>boolean</code> type. + * + * Default: true + * Group: advanced + * + * @param jetstreamAsync the value to set + * @return the dsl builder + */ + default AdvancedNatsEndpointBuilder jetstreamAsync(boolean jetstreamAsync) { + doSetProperty("jetstreamAsync", jetstreamAsync); + return this; + } + /** + * Sets whether to operate JetStream requests asynchronously. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: true + * Group: advanced + * + * @param jetstreamAsync the value to set + * @return the dsl builder + */ + default AdvancedNatsEndpointBuilder jetstreamAsync(String jetstreamAsync) { + doSetProperty("jetstreamAsync", jetstreamAsync); + return this; + } /** * Whether or not connection trace messages should be printed to * standard out for fine grained debugging of connection issues.