This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit e3a35a81b6a3c3cfd933db46e7c5097a7b46f453 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Jul 19 09:18:32 2018 +0200 CAMEL-12664 - Camel-Nats: Bump to version 2.0.0 of Jnats --- .../camel-nats/src/main/docs/nats-component.adoc | 4 +- .../camel/component/nats/NatsConfiguration.java | 73 +++++++++------------- .../apache/camel/component/nats/NatsConstants.java | 1 - .../apache/camel/component/nats/NatsConsumer.java | 61 +++++++++--------- .../apache/camel/component/nats/NatsProducer.java | 24 +++---- .../camel/component/nats/NatsConsumerLoadTest.java | 7 ++- .../nats/NatsConsumerMaxMessagesQueueTest.java | 1 - .../nats/NatsConsumerMaxMessagesTest.java | 2 - .../camel/component/nats/NatsConsumerTest.java | 1 - parent/pom.xml | 2 +- 10 files changed, 77 insertions(+), 99 deletions(-) diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc index 1d3f78c..d356918 100644 --- a/components/camel-nats/src/main/docs/nats-component.adoc +++ b/components/camel-nats/src/main/docs/nats-component.adoc @@ -66,7 +66,7 @@ with the following path and query parameters: |=== -==== Query Parameters (22 parameters): +==== Query Parameters (20 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -91,9 +91,7 @@ with the following path and query parameters: | *replySubject* (producer) | the subject to which subscribers should send response | | String | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean | *secure* (security) | Set secure option indicating TLS is required | false | boolean -| *ssl* (security) | Whether or not using SSL | false | boolean | *sslContextParameters* (security) | To configure security using SSLContextParameters | | SSLContextParameters -| *tlsDebug* (security) | TLS Debug, it will add additional console output | false | boolean |=== // endpoint options: END // spring-boot-auto-configure options: START diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java index 3e6fd20..5959a76 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java @@ -16,7 +16,8 @@ */ package org.apache.camel.component.nats; -import java.util.Properties; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; @@ -24,6 +25,9 @@ import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriPath; import org.apache.camel.util.jsse.SSLContextParameters; +import io.nats.client.Options; +import io.nats.client.Options.Builder; + @UriParams public class NatsConfiguration { @@ -39,8 +43,6 @@ public class NatsConfiguration { private boolean pedantic; @UriParam private boolean verbose; - @UriParam(label = "security") - private boolean ssl; @UriParam(defaultValue = "2000") private int reconnectTimeWait = 2000; @UriParam(defaultValue = "3") @@ -64,8 +66,6 @@ public class NatsConfiguration { @UriParam(label = "security") private boolean secure; @UriParam(label = "security") - private boolean tlsDebug; - @UriParam(label = "security") private SSLContextParameters sslContextParameters; /** @@ -124,17 +124,6 @@ public class NatsConfiguration { } /** - * Whether or not using SSL - */ - public boolean getSsl() { - return ssl; - } - - public void setSsl(boolean ssl) { - this.ssl = ssl; - } - - /** * Waiting time before attempts reconnection (in milliseconds) */ public int getReconnectTimeWait() { @@ -257,17 +246,6 @@ public class NatsConfiguration { } /** - * TLS Debug, it will add additional console output - */ - public boolean isTlsDebug() { - return tlsDebug; - } - - public void setTlsDebug(boolean tlsDebug) { - this.tlsDebug = tlsDebug; - } - - /** * To configure security using SSLContextParameters */ public SSLContextParameters getSslContextParameters() { @@ -278,24 +256,29 @@ public class NatsConfiguration { this.sslContextParameters = sslContextParameters; } - private static <T> void addPropertyIfNotNull(Properties props, String key, T value) { - if (value != null) { - props.put(key, value); - } - } - - public Properties createProperties() { - Properties props = new Properties(); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_URL, splitServers()); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_VERBOSE, getVerbose()); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_PEDANTIC, getPedantic()); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_SSL, getSsl()); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_RECONNECT, getReconnect()); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS, getMaxReconnectAttempts()); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_RECONNECT_TIME_WAIT, getReconnectTimeWait()); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_PING_INTERVAL, getPingInterval()); - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_DONT_RANDOMIZE_SERVERS, getNoRandomizeServers()); - return props; + public Builder createOptions() throws NoSuchAlgorithmException, IllegalArgumentException { + Builder builder = new Options.Builder(); + builder.server(splitServers()); + if (getVerbose()) { + builder.verbose(); + } + if (getPedantic()) { + builder.pedantic(); + } + if (isSecure()) { + builder.secure(); + } + if (!getReconnect()) { + builder.noReconnect(); + } else { + builder.maxReconnects(getMaxReconnectAttempts()); + builder.reconnectWait(Duration.ofMillis(getReconnectTimeWait())); + } + builder.pingInterval(Duration.ofMillis(getPingInterval())); + if (getNoRandomizeServers()) { + builder.noRandomize(); + } + return builder; } private String splitServers() { diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java index 9bdee5d..2a3ea87 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java @@ -19,5 +19,4 @@ package org.apache.camel.component.nats; public interface NatsConstants { String NATS_MESSAGE_TIMESTAMP = "CamelNatsMessageTimestamp"; - String NATS_SUBSCRIPTION_ID = "CamelNatsSubscriptionId"; } diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index 95bc0e3..529c6f9 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -18,17 +18,19 @@ package org.apache.camel.component.nats; import java.io.IOException; import java.security.GeneralSecurityException; -import java.util.Properties; +import java.time.Duration; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeoutException; import javax.net.ssl.SSLContext; import io.nats.client.Connection; -import io.nats.client.ConnectionFactory; +import io.nats.client.Connection.Status; +import io.nats.client.Dispatcher; import io.nats.client.Message; import io.nats.client.MessageHandler; -import io.nats.client.Subscription; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.Options.Builder; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -44,8 +46,8 @@ public class NatsConsumer extends DefaultConsumer { private final Processor processor; private ExecutorService executor; private Connection connection; - private Subscription sid; - private boolean subscribed; + private Dispatcher dispatcher; + private boolean active; public NatsConsumer(NatsEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -75,11 +77,11 @@ public class NatsConsumer extends DefaultConsumer { if (getEndpoint().getNatsConfiguration().isFlushConnection()) { LOG.debug("Flushing Messages before stopping"); - connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout()); + connection.flush(Duration.ofMillis(getEndpoint().getNatsConfiguration().getFlushTimeout())); } try { - sid.unsubscribe(); + dispatcher.unsubscribe(getEndpoint().getNatsConfiguration().getTopic()); } catch (Exception e) { getExceptionHandler().handleException("Error during unsubscribing", e); } @@ -95,31 +97,28 @@ public class NatsConsumer extends DefaultConsumer { executor = null; LOG.debug("Closing Nats Connection"); - if (!connection.isClosed()) { + if (!connection.getStatus().equals(Status.CLOSED)) { connection.close(); } } - private Connection getConnection() throws IOException, InterruptedException, TimeoutException, GeneralSecurityException { - Properties prop = getEndpoint().getNatsConfiguration().createProperties(); - ConnectionFactory factory = new ConnectionFactory(prop); + private Connection getConnection() throws InterruptedException, IllegalArgumentException, GeneralSecurityException, IOException { + Builder builder = getEndpoint().getNatsConfiguration().createOptions(); if (getEndpoint().getNatsConfiguration().getSslContextParameters() != null && getEndpoint().getNatsConfiguration().isSecure()) { SSLContext sslCtx = getEndpoint().getNatsConfiguration().getSslContextParameters().createSSLContext(getEndpoint().getCamelContext()); - factory.setSSLContext(sslCtx); - if (getEndpoint().getNatsConfiguration().isTlsDebug()) { - factory.setTlsDebug(getEndpoint().getNatsConfiguration().isTlsDebug()); - } + builder.sslContext(sslCtx); } - connection = factory.createConnection(); + Options options = builder.build(); + connection = Nats.connect(options); return connection; } - public boolean isSubscribed() { - return subscribed; + public boolean isActive() { + return active; } - public void setSubscribed(boolean subscribed) { - this.subscribed = subscribed; + public void setActive(boolean active) { + this.active = active; } class NatsConsumingTask implements Runnable { @@ -136,14 +135,13 @@ public class NatsConsumer extends DefaultConsumer { public void run() { try { if (ObjectHelper.isNotEmpty(configuration.getQueueName())) { - sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName(), new MessageHandler() { + dispatcher = connection.createDispatcher(new MessageHandler() { @Override public void onMessage(Message msg) { LOG.debug("Received Message: {}", msg); Exchange exchange = getEndpoint().createExchange(); exchange.getIn().setBody(msg); exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis()); - exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid); try { processor.process(exchange); } catch (Exception e) { @@ -151,21 +149,21 @@ public class NatsConsumer extends DefaultConsumer { } } }); + dispatcher = dispatcher.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName()); if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { - sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); + dispatcher.unsubscribe(getEndpoint().getNatsConfiguration().getTopic(), Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); } - if (sid.isValid()) { - setSubscribed(true); + if (dispatcher.isActive()) { + setActive(true); } } else { - sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() { + dispatcher = connection.createDispatcher(new MessageHandler() { @Override public void onMessage(Message msg) { LOG.debug("Received Message: {}", msg); Exchange exchange = getEndpoint().createExchange(); exchange.getIn().setBody(msg); exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis()); - exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid); try { processor.process(exchange); } catch (Exception e) { @@ -173,11 +171,12 @@ public class NatsConsumer extends DefaultConsumer { } } }); + dispatcher = dispatcher.subscribe(getEndpoint().getNatsConfiguration().getTopic()); if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) { - sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); + dispatcher.unsubscribe(getEndpoint().getNatsConfiguration().getTopic(), Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages())); } - if (sid.isValid()) { - setSubscribed(true); + if (dispatcher.isActive()) { + setActive(true); } } } catch (Throwable e) { diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java index 1be13e3..c8ac879 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java @@ -18,13 +18,18 @@ package org.apache.camel.component.nats; import java.io.IOException; import java.security.GeneralSecurityException; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; import java.util.Properties; import java.util.concurrent.TimeoutException; import javax.net.ssl.SSLContext; import io.nats.client.Connection; -import io.nats.client.ConnectionFactory; +import io.nats.client.Connection.Status; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.Options.Builder; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; @@ -77,26 +82,23 @@ public class NatsProducer extends DefaultProducer { LOG.debug("Stopping Nats Producer"); LOG.debug("Closing Nats Connection"); - if (connection != null && !connection.isClosed()) { + if (connection != null && !connection.getStatus().equals(Status.CLOSED)) { if (getEndpoint().getNatsConfiguration().isFlushConnection()) { LOG.debug("Flushing Nats Connection"); - connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout()); + connection.flush(Duration.ofMillis(getEndpoint().getNatsConfiguration().getFlushTimeout())); } connection.close(); } } - private Connection getConnection() throws TimeoutException, IOException, GeneralSecurityException { - Properties prop = getEndpoint().getNatsConfiguration().createProperties(); - ConnectionFactory factory = new ConnectionFactory(prop); + private Connection getConnection() throws InterruptedException, IllegalArgumentException, GeneralSecurityException, IOException { + Builder builder = getEndpoint().getNatsConfiguration().createOptions(); if (getEndpoint().getNatsConfiguration().getSslContextParameters() != null && getEndpoint().getNatsConfiguration().isSecure()) { SSLContext sslCtx = getEndpoint().getNatsConfiguration().getSslContextParameters().createSSLContext(getEndpoint().getCamelContext()); - factory.setSSLContext(sslCtx); - if (getEndpoint().getNatsConfiguration().isTlsDebug()) { - factory.setTlsDebug(getEndpoint().getNatsConfiguration().isTlsDebug()); - } + builder.sslContext(sslCtx); } - connection = factory.createConnection(); + Options options = builder.build(); + connection = Nats.connect(options); return connection; } diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java index d1a0350..830e031 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java @@ -20,7 +20,8 @@ import java.io.IOException; import java.util.concurrent.TimeoutException; import io.nats.client.Connection; -import io.nats.client.ConnectionFactory; +import io.nats.client.Nats; +import io.nats.client.Options; import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; @@ -38,8 +39,8 @@ public class NatsConsumerLoadTest extends CamelTestSupport { @Test public void testLoadConsumer() throws InterruptedException, IOException, TimeoutException { mockResultEndpoint.setExpectedMessageCount(10000); - ConnectionFactory cf = new ConnectionFactory("nats://localhost:4222"); - Connection connection = cf.createConnection(); + Options options = new Options.Builder().server("nats://localhost:4222").build(); + Connection connection = Nats.connect(options); for (int i = 0; i < 10000; i++) { connection.publish("test", ("test" + i).getBytes()); diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java index b69a6b7..5c42eae 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java @@ -33,7 +33,6 @@ public class NatsConsumerMaxMessagesQueueTest extends CamelTestSupport { @Test public void testMaxConsumer() throws InterruptedException, IOException { - mockResultEndpoint.expectedBodiesReceivedInAnyOrder("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}"); mockResultEndpoint.setExpectedMessageCount(2); template.sendBody("direct:send", "test"); diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java index 7f4d434..d822a97 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java @@ -33,8 +33,6 @@ public class NatsConsumerMaxMessagesTest extends CamelTestSupport { @Test public void testMaxConsumer() throws InterruptedException, IOException { - mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}", - "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}"); mockResultEndpoint.setExpectedMessageCount(5); template.sendBody("direct:send", "test"); template.sendBody("direct:send", "test1"); diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java index 24d4877..86294f1 100644 --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java @@ -34,7 +34,6 @@ public class NatsConsumerTest extends CamelTestSupport { @Test public void testConsumer() throws InterruptedException, IOException { mockResultEndpoint.expectedMessageCount(1); - mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}"); template.requestBody("direct:send", "test"); mockResultEndpoint.assertIsSatisfied(); diff --git a/parent/pom.xml b/parent/pom.xml index 17a49f1..96a0bf1 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -346,7 +346,7 @@ <java-util-version>1.34.0</java-util-version> <java-util-bundle-version>1.34.0_1</java-util-bundle-version> <jna-version>4.2.2</jna-version> - <jnats-version>1.0</jnats-version> + <jnats-version>2.0.0</jnats-version> <javacc-maven-plugin-version>2.6</javacc-maven-plugin-version> <javacrumbs-version>0.22</javacrumbs-version> <javapoet-version>1.11.1</javapoet-version>