Repository: camel
Updated Branches:
  refs/heads/master 4e0437c40 -> e50136b7d


Polished


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e50136b7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e50136b7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e50136b7

Branch: refs/heads/master
Commit: e50136b7d9ed6f217346ec4a6b4f13e2fa17e52b
Parents: 4e0437c
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Jan 19 10:07:17 2016 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Jan 19 10:08:35 2016 +0100

----------------------------------------------------------------------
 .../camel/component/nats/NatsConfiguration.java | 71 ++++++++++++--------
 .../camel/component/nats/NatsConsumer.java      | 19 +++---
 .../camel/component/nats/NatsEndpoint.java      | 11 +--
 .../camel/component/nats/NatsProducer.java      | 12 ++--
 4 files changed, 59 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e50136b7/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 43182b9..260d1a7 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -26,9 +26,11 @@ import org.apache.camel.spi.UriPath;
 @UriParams
 public class NatsConfiguration {
 
-       @UriPath @Metadata(required = "true")
+    @UriPath
+    @Metadata(required = "true")
     private String servers;
-    @UriParam @Metadata(required = "true")
+    @UriParam
+    @Metadata(required = "true")
     private String topic;
     @UriParam(defaultValue = "true")
     private boolean reconnect = true;
@@ -52,143 +54,156 @@ public class NatsConfiguration {
     private String maxMessages;
     @UriParam(label = "consumer", defaultValue = "10")
     private int poolSize = 10;
-    
+
     /**
-     * The Nats servers
+     * URLs to one or more NAT servers. Use comma to separate URLs when 
specifying multiple servers.
      */
     public String getServers() {
         return servers;
     }
+
     public void setServers(String servers) {
         this.servers = servers;
     }
-    
+
     /**
      * The name of topic we want to use
-     */   
+     */
     public String getTopic() {
         return topic;
     }
+
     public void setTopic(String topic) {
         this.topic = topic;
     }
-        
+
     /**
      * Whether or not using reconnection feature
-     */   
+     */
     public boolean getReconnect() {
         return reconnect;
     }
+
     public void setReconnect(boolean reconnect) {
         this.reconnect = reconnect;
     }
-     
+
     /**
      * Whether or not running in pedantic mode (this affects performace)
-     */  
+     */
     public boolean getPedantic() {
         return pedantic;
     }
+
     public void setPedantic(boolean pedantic) {
         this.pedantic = pedantic;
     }
-    
+
     /**
      * Whether or not running in verbose mode
-     */  
+     */
     public boolean getVerbose() {
         return verbose;
     }
+
     public void setVerbose(boolean verbose) {
         this.verbose = verbose;
     }
-    
+
     /**
      * Whether or not using SSL
-     */ 
+     */
     public boolean getSsl() {
         return ssl;
     }
+
     public void setSsl(boolean ssl) {
         this.ssl = ssl;
     }
-    
+
     /**
      * Waiting time before attempts reconnection (in milliseconds)
-     */ 
+     */
     public int getReconnectTimeWait() {
         return reconnectTimeWait;
     }
+
     public void setReconnectTimeWait(int reconnectTimeWait) {
         this.reconnectTimeWait = reconnectTimeWait;
     }
-    
+
     /**
      * Max reconnection attempts
-     */ 
+     */
     public int getMaxReconnectAttempts() {
         return maxReconnectAttempts;
     }
+
     public void setMaxReconnectAttempts(int maxReconnectAttempts) {
         this.maxReconnectAttempts = maxReconnectAttempts;
     }
-    
+
     /**
      * Ping interval to be aware if connection is still alive (in milliseconds)
      */
     public int getPingInterval() {
         return pingInterval;
     }
+
     public void setPingInterval(int pingInterval) {
         this.pingInterval = pingInterval;
     }
-    
+
     /**
      * Whether or not randomizing the order of servers for the connection 
attempts
      */
     public boolean getNoRandomizeServers() {
         return noRandomizeServers;
     }
+
     public void setNoRandomizeServers(boolean noRandomizeServers) {
         this.noRandomizeServers = noRandomizeServers;
     }
-    
+
     /**
      * The Queue name if we are using nats for a queue configuration
      */
     public String getQueueName() {
         return queueName;
     }
+
     public void setQueueName(String queueName) {
         this.queueName = queueName;
     }
-    
+
     /**
      * Stop receiving messages from a topic we are subscribing to after 
maxMessages 
      */
     public String getMaxMessages() {
         return maxMessages;
     }
+
     public void setMaxMessages(String maxMessages) {
         this.maxMessages = maxMessages;
     }
-    
+
     /**
      * Consumer pool size
      */
     public int getPoolSize() {
         return poolSize;
     }
+
     public void setPoolSize(int poolSize) {
         this.poolSize = poolSize;
     }
-    
+
     private static <T> void addPropertyIfNotNull(Properties props, String key, 
T value) {
         if (value != null) {
             props.put(key, value);
         }
     }
-    
+
     public Properties createProperties() {
         Properties props = new Properties();
         addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_URI, 
splitServers());
@@ -202,18 +217,18 @@ public class NatsConfiguration {
         addPropertyIfNotNull(props, 
NatsPropertiesConstants.NATS_PROPERTY_DONT_RANDOMIZE_SERVERS, 
getNoRandomizeServers());
         return props;
     }
-    
+
     public Properties createSubProperties() {
         Properties props = new Properties();
         addPropertyIfNotNull(props, 
NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName());
         addPropertyIfNotNull(props, 
NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages());
         return props;
     }
-    
+
     private String splitServers() {
         StringBuilder servers = new StringBuilder();
         String prefix = "nats://";
-        
+
         String[] pieces = getServers().split(",");
         for (int i = 0; i < pieces.length; i++) {
             if (i < pieces.length - 1) {

http://git-wip-us.apache.org/repos/asf/camel/blob/e50136b7/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index e63f485..029ab13 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -32,15 +32,13 @@ public class NatsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NatsConsumer.class);
 
-    protected ExecutorService executor;
-    private final NatsEndpoint endpoint;
     private final Processor processor;
+    private ExecutorService executor;
     private Connection connection;
     private int sid;
 
     public NatsConsumer(NatsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        this.endpoint = endpoint;
         this.processor = processor;
     }
 
@@ -53,18 +51,20 @@ public class NatsConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
         LOG.debug("Starting Nats Consumer");
-        executor = endpoint.createExecutor();
+        executor = getEndpoint().createExecutor();
 
         LOG.debug("Getting Nats Connection");
         connection = getConnection();
 
         executor.submit(new NatsConsumingTask(connection, 
getEndpoint().getNatsConfiguration()));
-
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
+
+        // TODO: Should we not unsubscribe first?
+
         LOG.debug("Stopping Nats Consumer");
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
@@ -82,17 +82,14 @@ public class NatsConsumer extends DefaultConsumer {
     }
 
     private Connection getConnection() throws IOException, 
InterruptedException {
-
         Properties prop = 
getEndpoint().getNatsConfiguration().createProperties();
         connection = Connection.connect(prop);
-
         return connection;
     }
 
     class NatsConsumingTask implements Runnable {
 
         private final Connection connection;
-        
         private final NatsConfiguration configuration;
 
         public NatsConsumingTask(Connection connection, NatsConfiguration 
configuration) {
@@ -105,7 +102,7 @@ public class NatsConsumer extends DefaultConsumer {
             try {
                 sid = 
connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), 
configuration.createSubProperties(), new MsgHandler() {
                     public void execute(String msg) {
-                        LOG.debug("Received Message: " + msg);
+                        LOG.debug("Received Message: {}", msg);
                         Exchange exchange = getEndpoint().createExchange();
                         exchange.getIn().setBody(msg);
                         
exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, 
System.currentTimeMillis());
@@ -117,8 +114,8 @@ public class NatsConsumer extends DefaultConsumer {
                         }
                     }
                 });
-            } catch (IOException e1) {
-                getExceptionHandler().handleException("Error during 
processing", e1);
+            } catch (Throwable e) {
+                getExceptionHandler().handleException("Error during 
processing", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e50136b7/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
index 6a076d4..160b8ee 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
@@ -24,14 +24,10 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-@UriEndpoint(scheme = "nats", title = "Nats", syntax = "nats:host", label = 
"messaging")
+@UriEndpoint(scheme = "nats", title = "Nats", syntax = "nats:servers", label = 
"messaging", consumerClass = NatsConsumer.class)
 public class NatsEndpoint extends DefaultEndpoint {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(NatsEndpoint.class);
-
     @UriParam
     private NatsConfiguration configuration;
     
@@ -56,12 +52,9 @@ public class NatsEndpoint extends DefaultEndpoint {
 
     @Override
     public boolean isSingleton() {
-        return false;
+        return true;
     }
     
-    /**
-     * The nats Configuration
-     */
     public NatsConfiguration getNatsConfiguration() {
         return configuration;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e50136b7/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 92e2424..89b2b23 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -43,10 +43,12 @@ public class NatsProducer extends DefaultProducer {
     @Override
     public void process(Exchange exchange) throws Exception {
         NatsConfiguration config = getEndpoint().getNatsConfiguration();
-        connection.publish(config.getTopic(), 
exchange.getIn().getBody(String.class).getBytes());
+        String body = exchange.getIn().getMandatoryBody(String.class);
+
+        LOG.debug("Publishing to topic: {}", config.getTopic());
+        connection.publish(config.getTopic(), body.getBytes());
     }
     
-
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -62,17 +64,15 @@ public class NatsProducer extends DefaultProducer {
         LOG.debug("Stopping Nats Producer");
         
         LOG.debug("Closing Nats Connection");
-        if (connection.isConnected()) {
+        if (connection != null && connection.isConnected()) {
             connection.close();
         }
     }
 
-    
     private Connection getConnection() throws IOException, 
InterruptedException {
-
         Properties prop = 
getEndpoint().getNatsConfiguration().createProperties();
         connection = Connection.connect(prop);
-
         return connection;
     }
+
 }

Reply via email to