Repository: camel Updated Branches: refs/heads/master 4e0437c40 -> e50136b7d
Polished Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e50136b7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e50136b7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e50136b7 Branch: refs/heads/master Commit: e50136b7d9ed6f217346ec4a6b4f13e2fa17e52b Parents: 4e0437c Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jan 19 10:07:17 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jan 19 10:08:35 2016 +0100 ---------------------------------------------------------------------- .../camel/component/nats/NatsConfiguration.java | 71 ++++++++++++-------- .../camel/component/nats/NatsConsumer.java | 19 +++--- .../camel/component/nats/NatsEndpoint.java | 11 +-- .../camel/component/nats/NatsProducer.java | 12 ++-- 4 files changed, 59 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e50136b7/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java index 43182b9..260d1a7 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java @@ -26,9 +26,11 @@ import org.apache.camel.spi.UriPath; @UriParams public class NatsConfiguration { - @UriPath @Metadata(required = "true") + @UriPath + @Metadata(required = "true") private String servers; - @UriParam @Metadata(required = "true") + @UriParam + @Metadata(required = "true") private String topic; @UriParam(defaultValue = "true") private boolean reconnect = true; @@ -52,143 +54,156 @@ public class NatsConfiguration { private String maxMessages; @UriParam(label = "consumer", defaultValue = "10") private int poolSize = 10; - + /** - * The Nats servers + * URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers. */ public String getServers() { return servers; } + public void setServers(String servers) { this.servers = servers; } - + /** * The name of topic we want to use - */ + */ public String getTopic() { return topic; } + public void setTopic(String topic) { this.topic = topic; } - + /** * Whether or not using reconnection feature - */ + */ public boolean getReconnect() { return reconnect; } + public void setReconnect(boolean reconnect) { this.reconnect = reconnect; } - + /** * Whether or not running in pedantic mode (this affects performace) - */ + */ public boolean getPedantic() { return pedantic; } + public void setPedantic(boolean pedantic) { this.pedantic = pedantic; } - + /** * Whether or not running in verbose mode - */ + */ public boolean getVerbose() { return verbose; } + public void setVerbose(boolean verbose) { this.verbose = verbose; } - + /** * Whether or not using SSL - */ + */ public boolean getSsl() { return ssl; } + public void setSsl(boolean ssl) { this.ssl = ssl; } - + /** * Waiting time before attempts reconnection (in milliseconds) - */ + */ public int getReconnectTimeWait() { return reconnectTimeWait; } + public void setReconnectTimeWait(int reconnectTimeWait) { this.reconnectTimeWait = reconnectTimeWait; } - + /** * Max reconnection attempts - */ + */ public int getMaxReconnectAttempts() { return maxReconnectAttempts; } + public void setMaxReconnectAttempts(int maxReconnectAttempts) { this.maxReconnectAttempts = maxReconnectAttempts; } - + /** * Ping interval to be aware if connection is still alive (in milliseconds) */ public int getPingInterval() { return pingInterval; } + public void setPingInterval(int pingInterval) { this.pingInterval = pingInterval; } - + /** * Whether or not randomizing the order of servers for the connection attempts */ public boolean getNoRandomizeServers() { return noRandomizeServers; } + public void setNoRandomizeServers(boolean noRandomizeServers) { this.noRandomizeServers = noRandomizeServers; } - + /** * The Queue name if we are using nats for a queue configuration */ public String getQueueName() { return queueName; } + public void setQueueName(String queueName) { this.queueName = queueName; } - + /** * Stop receiving messages from a topic we are subscribing to after maxMessages */ public String getMaxMessages() { return maxMessages; } + public void setMaxMessages(String maxMessages) { this.maxMessages = maxMessages; } - + /** * Consumer pool size */ public int getPoolSize() { return poolSize; } + public void setPoolSize(int poolSize) { this.poolSize = poolSize; } - + private static <T> void addPropertyIfNotNull(Properties props, String key, T value) { if (value != null) { props.put(key, value); } } - + public Properties createProperties() { Properties props = new Properties(); addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_URI, splitServers()); @@ -202,18 +217,18 @@ public class NatsConfiguration { addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_DONT_RANDOMIZE_SERVERS, getNoRandomizeServers()); return props; } - + public Properties createSubProperties() { Properties props = new Properties(); addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName()); addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages()); return props; } - + private String splitServers() { StringBuilder servers = new StringBuilder(); String prefix = "nats://"; - + String[] pieces = getServers().split(","); for (int i = 0; i < pieces.length; i++) { if (i < pieces.length - 1) { http://git-wip-us.apache.org/repos/asf/camel/blob/e50136b7/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index e63f485..029ab13 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -32,15 +32,13 @@ public class NatsConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(NatsConsumer.class); - protected ExecutorService executor; - private final NatsEndpoint endpoint; private final Processor processor; + private ExecutorService executor; private Connection connection; private int sid; public NatsConsumer(NatsEndpoint endpoint, Processor processor) { super(endpoint, processor); - this.endpoint = endpoint; this.processor = processor; } @@ -53,18 +51,20 @@ public class NatsConsumer extends DefaultConsumer { protected void doStart() throws Exception { super.doStart(); LOG.debug("Starting Nats Consumer"); - executor = endpoint.createExecutor(); + executor = getEndpoint().createExecutor(); LOG.debug("Getting Nats Connection"); connection = getConnection(); executor.submit(new NatsConsumingTask(connection, getEndpoint().getNatsConfiguration())); - } @Override protected void doStop() throws Exception { super.doStop(); + + // TODO: Should we not unsubscribe first? + LOG.debug("Stopping Nats Consumer"); if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { @@ -82,17 +82,14 @@ public class NatsConsumer extends DefaultConsumer { } private Connection getConnection() throws IOException, InterruptedException { - Properties prop = getEndpoint().getNatsConfiguration().createProperties(); connection = Connection.connect(prop); - return connection; } class NatsConsumingTask implements Runnable { private final Connection connection; - private final NatsConfiguration configuration; public NatsConsumingTask(Connection connection, NatsConfiguration configuration) { @@ -105,7 +102,7 @@ public class NatsConsumer extends DefaultConsumer { try { sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), configuration.createSubProperties(), new MsgHandler() { public void execute(String msg) { - LOG.debug("Received Message: " + msg); + LOG.debug("Received Message: {}", msg); Exchange exchange = getEndpoint().createExchange(); exchange.getIn().setBody(msg); exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis()); @@ -117,8 +114,8 @@ public class NatsConsumer extends DefaultConsumer { } } }); - } catch (IOException e1) { - getExceptionHandler().handleException("Error during processing", e1); + } catch (Throwable e) { + getExceptionHandler().handleException("Error during processing", e); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/e50136b7/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java index 6a076d4..160b8ee 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java @@ -24,14 +24,10 @@ import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -@UriEndpoint(scheme = "nats", title = "Nats", syntax = "nats:host", label = "messaging") +@UriEndpoint(scheme = "nats", title = "Nats", syntax = "nats:servers", label = "messaging", consumerClass = NatsConsumer.class) public class NatsEndpoint extends DefaultEndpoint { - private static final Logger LOG = LoggerFactory.getLogger(NatsEndpoint.class); - @UriParam private NatsConfiguration configuration; @@ -56,12 +52,9 @@ public class NatsEndpoint extends DefaultEndpoint { @Override public boolean isSingleton() { - return false; + return true; } - /** - * The nats Configuration - */ public NatsConfiguration getNatsConfiguration() { return configuration; } http://git-wip-us.apache.org/repos/asf/camel/blob/e50136b7/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java index 92e2424..89b2b23 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java @@ -43,10 +43,12 @@ public class NatsProducer extends DefaultProducer { @Override public void process(Exchange exchange) throws Exception { NatsConfiguration config = getEndpoint().getNatsConfiguration(); - connection.publish(config.getTopic(), exchange.getIn().getBody(String.class).getBytes()); + String body = exchange.getIn().getMandatoryBody(String.class); + + LOG.debug("Publishing to topic: {}", config.getTopic()); + connection.publish(config.getTopic(), body.getBytes()); } - @Override protected void doStart() throws Exception { super.doStart(); @@ -62,17 +64,15 @@ public class NatsProducer extends DefaultProducer { LOG.debug("Stopping Nats Producer"); LOG.debug("Closing Nats Connection"); - if (connection.isConnected()) { + if (connection != null && connection.isConnected()) { connection.close(); } } - private Connection getConnection() throws IOException, InterruptedException { - Properties prop = getEndpoint().getNatsConfiguration().createProperties(); connection = Connection.connect(prop); - return connection; } + }