Repository: camel
Updated Branches:
  refs/heads/master f229d67fe -> bd666dbc0


#CAMEL-3195 Allow camel to send custom xmpp PubSub packet to a xmpp endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b1ff1aa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b1ff1aa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b1ff1aa

Branch: refs/heads/master
Commit: 1b1ff1aa515a2a6b964f6e98d85cec0e3dff171c
Parents: f229d67
Author: Hugo Freire <hfre...@abajar.com>
Authored: Thu Feb 6 15:59:14 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Dec 4 08:29:33 2014 +0100

----------------------------------------------------------------------
 .../camel/component/xmpp/XmppBinding.java       | 62 ++++++++++++++++----
 .../camel/component/xmpp/XmppConstants.java     |  1 +
 .../camel/component/xmpp/XmppConsumer.java      | 16 +++++
 .../camel/component/xmpp/XmppEndpoint.java      | 38 ++++++++++--
 .../camel/component/xmpp/XmppMessage.java       | 38 ++++++++----
 .../component/xmpp/XmppPubSubProducer.java      | 60 +++++++++++++++++++
 .../component/xmpp/UriConfigurationTest.java    | 15 +++++
 7 files changed, 204 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
----------------------------------------------------------------------
diff --git 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
index 58debd7..1f43527 100644
--- 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
+++ 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
@@ -25,6 +25,9 @@ import org.apache.camel.impl.DefaultHeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.ObjectHelper;
 import org.jivesoftware.smack.packet.Message;
+import org.jivesoftware.smack.packet.Packet;
+import org.jivesoftware.smack.packet.Presence;
+import org.jivesoftware.smackx.pubsub.packet.PubSub;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,31 +89,68 @@ public class XmppBinding {
             message.setProperty("exchangeId", id);
         }
     }
+    
+    /**
+     * Populates the given XMPP packet from the inbound exchange
+     */
+    public void populateXmppPacket(Packet packet, Exchange exchange) {
+        Set<Map.Entry<String, Object>> entries = 
exchange.getIn().getHeaders().entrySet();
+        for (Map.Entry<String, Object> entry : entries) {
+            String name = entry.getKey();
+            Object value = entry.getValue();
+            if (!headerFilterStrategy.applyFilterToCamelHeaders(name, value, 
exchange)) {
+                    try {
+                       packet.setProperty(name, value);
+                        LOG.debug("Added property name: " + name + " value: " 
+ value.toString());
+                    } catch (IllegalArgumentException iae) {
+                        LOG.debug("Not adding property " + name + " to XMPP 
message due to " + iae);
+                    }
+                }
+            }        
+        String id = exchange.getExchangeId();
+        if (id != null) {
+               packet.setProperty("exchangeId", id);
+        }
+    }
+    
 
     /**
      * Extracts the body from the XMPP message
      */
-    public Object extractBodyFromXmpp(Exchange exchange, Message message) {
-        return message.getBody();
+    public Object extractBodyFromXmpp(Exchange exchange, Packet xmppPacket) {
+        return (xmppPacket instanceof Message)? 
GetMessageBody((Message)xmppPacket): xmppPacket;
+    }
+    
+    private Object GetMessageBody(Message message) {
+       String messageBody = message.getBody();
+       if(messageBody == null) //probably a pubsub message
+               return message;
+       return messageBody;
     }
 
-    public Map<String, Object> extractHeadersFromXmpp(Message xmppMessage, 
Exchange exchange) {
+    public Map<String, Object> extractHeadersFromXmpp(Packet xmppPacket, 
Exchange exchange) {
         Map<String, Object> answer = new HashMap<String, Object>();
 
-        for (String name : xmppMessage.getPropertyNames()) {
-            Object value = xmppMessage.getProperty(name);
+        for (String name : xmppPacket.getPropertyNames()) {
+            Object value = xmppPacket.getProperty(name);
 
             if (!headerFilterStrategy.applyFilterToExternalHeaders(name, 
value, exchange)) {
                 answer.put(name, value);
             }
         }
 
-        answer.put(XmppConstants.MESSAGE_TYPE, xmppMessage.getType());
-        answer.put(XmppConstants.SUBJECT, xmppMessage.getSubject());
-        answer.put(XmppConstants.THREAD_ID, xmppMessage.getThread());
-        answer.put(XmppConstants.FROM, xmppMessage.getFrom());
-        answer.put(XmppConstants.PACKET_ID, xmppMessage.getPacketID());
-        answer.put(XmppConstants.TO, xmppMessage.getTo());
+        if(xmppPacket instanceof Message) {
+            Message xmppMessage = (Message)xmppPacket;
+            answer.put(XmppConstants.MESSAGE_TYPE, xmppMessage.getType());
+            answer.put(XmppConstants.SUBJECT, xmppMessage.getSubject());
+            answer.put(XmppConstants.THREAD_ID, xmppMessage.getThread());
+        } else if(xmppPacket instanceof PubSub) {
+               PubSub pubsubPacket = (PubSub)xmppPacket;
+            answer.put(XmppConstants.MESSAGE_TYPE, pubsubPacket.getType());
+        }
+        answer.put(XmppConstants.FROM, xmppPacket.getFrom());
+        answer.put(XmppConstants.PACKET_ID, xmppPacket.getPacketID());
+        answer.put(XmppConstants.TO, xmppPacket.getTo());
                 
         return answer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
index 68649ea..1251e9c 100644
--- 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
+++ 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
@@ -28,4 +28,5 @@ public interface XmppConstants {
     String FROM = "CamelXmppFrom";
     String PACKET_ID = "CamelXmppPacketID";
     String TO = "CamelXmppTo";
+    String docHeader = "doc";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
index 657c504..ef57fda 100644
--- 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
+++ 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
@@ -31,9 +31,12 @@ import org.jivesoftware.smack.SmackConfiguration;
 import org.jivesoftware.smack.XMPPConnection;
 import org.jivesoftware.smack.XMPPException;
 import org.jivesoftware.smack.filter.AndFilter;
+import org.jivesoftware.smack.filter.MessageTypeFilter;
+import org.jivesoftware.smack.filter.OrFilter;
 import org.jivesoftware.smack.filter.PacketTypeFilter;
 import org.jivesoftware.smack.filter.ToContainsFilter;
 import org.jivesoftware.smack.packet.Message;
+import org.jivesoftware.smack.packet.Message.Type;
 import org.jivesoftware.smack.packet.Packet;
 import org.jivesoftware.smack.packet.Presence;
 import org.jivesoftware.smackx.muc.DiscussionHistory;
@@ -79,6 +82,14 @@ public class XmppConsumer extends DefaultConsumer implements 
PacketListener, Mes
 
         chatManager = connection.getChatManager();
         chatManager.addChatListener(this);
+        
+        OrFilter pubsubPacketFilter = new OrFilter();
+        if(endpoint.isPubsub()){
+               //xep-0060: pubsub#notification_type can be 'headline' or 
'normal'
+               pubsubPacketFilter.addFilter(new 
MessageTypeFilter(Type.headline));
+               pubsubPacketFilter.addFilter(new 
MessageTypeFilter(Type.normal));
+               connection.addPacketListener(this, pubsubPacketFilter);
+        }
 
         if (endpoint.getRoom() == null) {
             privateChat = chatManager.getThreadChat(endpoint.getChatId());
@@ -209,6 +220,10 @@ public class XmppConsumer extends DefaultConsumer 
implements PacketListener, Mes
         }
 
         Exchange exchange = endpoint.createExchange(message);
+
+        if(endpoint.isDoc() == true) {
+               exchange.getIn().setHeader(XmppConstants.docHeader, message);
+        }
         try {
             getProcessor().process(exchange);
         } catch (Exception e) {
@@ -222,4 +237,5 @@ public class XmppConsumer extends DefaultConsumer 
implements PacketListener, Mes
             }
         }
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
index 8c3f853..a20799b 100644
--- 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
+++ 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
@@ -62,6 +62,10 @@ public class XmppEndpoint extends DefaultEndpoint implements 
HeaderFilterStrateg
     private String nickname;
     private String serviceName;
     private XMPPConnection connection;
+    private boolean pubsub = false;
+    //Set a doc header on the IN message containing a Document form of the 
incoming packet; 
+    //default is true if pubsub is true, otherwise false
+    private boolean doc = false;
     private boolean testConnectionOnStartup = true;
     private int connectionPollDelay = 10;
 
@@ -81,6 +85,9 @@ public class XmppEndpoint extends DefaultEndpoint implements 
HeaderFilterStrateg
         if (room != null) {
             return createGroupChatProducer();
         } else {
+               if(isPubsub() == true) {
+                       return createPubSubProducer();
+               }
             if (getParticipant() == null) {
                 throw new IllegalArgumentException("No room or participant 
configured on this endpoint: " + this);
             }
@@ -95,6 +102,10 @@ public class XmppEndpoint extends DefaultEndpoint 
implements HeaderFilterStrateg
     public Producer createPrivateChatProducer(String participant) throws 
Exception {
         return new XmppPrivateChatProducer(this, participant);
     }
+    
+    public Producer createPubSubProducer() throws Exception {
+       return new XmppPubSubProducer(this);
+    }
 
     public Consumer createConsumer(Processor processor) throws Exception {
         XmppConsumer answer = new XmppConsumer(this, processor);
@@ -107,14 +118,14 @@ public class XmppEndpoint extends DefaultEndpoint 
implements HeaderFilterStrateg
         return createExchange(pattern, null);
     }
 
-    public Exchange createExchange(Message message) {
-        return createExchange(getExchangePattern(), message);
+    public Exchange createExchange(Packet packet) {
+        return createExchange(getExchangePattern(), packet);
     }
 
-    private Exchange createExchange(ExchangePattern pattern, Message message) {
+    private Exchange createExchange(ExchangePattern pattern, Packet packet) {
         Exchange exchange = new DefaultExchange(this, getExchangePattern());
         exchange.setProperty(Exchange.BINDING, getBinding());
-        exchange.setIn(new XmppMessage(message));
+        exchange.setIn(new XmppMessage(packet));
         return exchange;
     }
 
@@ -369,6 +380,25 @@ public class XmppEndpoint extends DefaultEndpoint 
implements HeaderFilterStrateg
         this.connectionPollDelay = connectionPollDelay;
     }
 
+       public void setPubsub(boolean pubsub) {
+               this.pubsub = pubsub;
+               if(pubsub == true) {
+                       setDoc(true);
+               }
+       }
+
+       public boolean isPubsub() {
+               return pubsub;
+       }
+
+       public void setDoc(boolean doc) {
+               this.doc = doc;
+       }
+
+       public boolean isDoc() {
+               return doc;
+       }
+
     // Implementation methods
     // 
-------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
----------------------------------------------------------------------
diff --git 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
index 016f1fd..b75243c 100644
--- 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
+++ 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.util.ExchangeHelper;
 import org.jivesoftware.smack.packet.Message;
+import org.jivesoftware.smack.packet.Packet;
 
 /**
  * Represents a {@link org.apache.camel.Message} for working with XMPP
@@ -28,20 +29,24 @@ import org.jivesoftware.smack.packet.Message;
  * @version 
  */
 public class XmppMessage extends DefaultMessage {
-    private Message xmppMessage;
+    private Packet xmppPacket;
 
     public XmppMessage() {
         this(new Message());
     }
 
     public XmppMessage(Message jmsMessage) {
-        this.xmppMessage = jmsMessage;
+        this.xmppPacket = jmsMessage;
     }
-
+    
+    public XmppMessage(Packet jmsMessage) {
+        this.xmppPacket = jmsMessage;
+    }
+    
     @Override
     public String toString() {
-        if (xmppMessage != null) {
-            return "XmppMessage: " + xmppMessage;
+        if (xmppPacket != null) {
+            return "XmppMessage: " + xmppPacket;
         } else {
             return "XmppMessage: " + getBody();
         }
@@ -51,11 +56,22 @@ public class XmppMessage extends DefaultMessage {
      * Returns the underlying XMPP message
      */
     public Message getXmppMessage() {
-        return xmppMessage;
+        return (xmppPacket instanceof Message) ? (Message)xmppPacket : null;
     }
 
     public void setXmppMessage(Message xmppMessage) {
-        this.xmppMessage = xmppMessage;
+        this.xmppPacket = xmppMessage;
+    }
+
+    /**
+     * Returns the underlying XMPP packet
+     */
+    public Packet getXmppPacket() {
+        return xmppPacket;
+    }
+
+    public void setXmppPacket(Packet xmppPacket) {
+        this.xmppPacket = xmppPacket;
     }
     
     @Override
@@ -65,10 +81,10 @@ public class XmppMessage extends DefaultMessage {
 
     @Override
     protected Object createBody() {
-        if (xmppMessage != null) {
+        if (xmppPacket != null) {
             XmppBinding binding = ExchangeHelper.getBinding(getExchange(), 
XmppBinding.class);
             if (binding != null) {
-                return binding.extractBodyFromXmpp(getExchange(), xmppMessage);
+                return (getHeader(XmppConstants.docHeader) == null) ? 
binding.extractBodyFromXmpp(getExchange(), xmppPacket): 
getHeader(XmppConstants.docHeader);
             }
         }
         return null;
@@ -76,10 +92,10 @@ public class XmppMessage extends DefaultMessage {
     
     @Override
     protected void populateInitialHeaders(Map<String, Object> map) {
-        if (xmppMessage != null) {
+        if (xmppPacket != null) {
             XmppBinding binding = ExchangeHelper.getBinding(getExchange(), 
XmppBinding.class);
             if (binding != null) {
-                map.putAll(binding.extractHeadersFromXmpp(xmppMessage, 
getExchange()));
+                map.putAll(binding.extractHeadersFromXmpp(xmppPacket, 
getExchange()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java
 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java
new file mode 100644
index 0000000..20d46a4
--- /dev/null
+++ 
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java
@@ -0,0 +1,60 @@
+package org.apache.camel.component.xmpp;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.impl.DefaultProducer;
+import org.jivesoftware.smack.XMPPConnection;
+import org.jivesoftware.smack.XMPPException;
+import org.jivesoftware.smackx.pubsub.packet.PubSub;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class XmppPubSubProducer extends DefaultProducer {
+       private static final transient Logger LOG = 
LoggerFactory.getLogger(XmppPrivateChatProducer.class);
+    private final XmppEndpoint endpoint;
+    private XMPPConnection connection;
+
+    public XmppPubSubProducer(XmppEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        LOG.debug("Creating XmppPresenceProducer");
+    }
+
+       public void process(Exchange exchange) throws Exception {
+        try {
+            if (connection == null) {
+                connection = endpoint.createConnection();
+            }
+
+            // make sure we are connected
+            if (!connection.isConnected()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Reconnecting to: " + 
XmppEndpoint.getConnectionMessage(connection));
+                }
+                connection.connect();
+            }
+        } catch (XMPPException e) {
+            throw new RuntimeExchangeException("Cannot connect to XMPP Server: 
"
+                    + ((connection != null) ? 
XmppEndpoint.getConnectionMessage(connection): endpoint.getHost()), exchange, 
e);
+        }
+        
+        try {
+            Object body = exchange.getIn().getBody(Object.class);
+            if(body instanceof PubSub) {
+               PubSub pubsubpacket = (PubSub) body;
+                endpoint.getBinding().populateXmppPacket(pubsubpacket, 
exchange);
+               exchange.getIn().setHeader(XmppConstants.docHeader, 
pubsubpacket);
+               connection.sendPacket(pubsubpacket);
+            } else {
+                throw new Exception("Message does not contain a pubsub 
packet");               
+            }          
+        } catch (XMPPException xmppe) {
+            throw new RuntimeExchangeException("Cannot send XMPP pubsub: from 
" + endpoint.getUser()
+                    + " to: " + XmppEndpoint.getConnectionMessage(connection), 
exchange, xmppe);
+        } catch (Exception e) {
+            throw new RuntimeExchangeException("Cannot send XMPP pubsub: from 
" + endpoint.getUser()
+                    + " to: " + XmppEndpoint.getConnectionMessage(connection), 
exchange, e);
+        }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
 
b/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
index abb9699..39ef33f 100644
--- 
a/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
+++ 
b/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
@@ -69,4 +69,19 @@ public class UriConfigurationTest extends Assert {
 
         assertEquals("Camel", xmppEndpoint.getResource());
     }
+    
+    @Test
+    public void testPubSubConfiguration() throws Exception {
+        Endpoint endpoint = 
context.getEndpoint("xmpp://camel-user@localhost:123?password=secret&pubsub=true");
+        assertTrue("Endpoint not an XmppEndpoint: " + endpoint, endpoint 
instanceof XmppEndpoint);
+        XmppEndpoint xmppEndpoint = (XmppEndpoint) endpoint;
+
+        assertEquals("localhost", xmppEndpoint.getHost());
+        assertEquals(123, xmppEndpoint.getPort());
+        assertEquals("camel-user", xmppEndpoint.getUser());
+        assertEquals("secret", xmppEndpoint.getPassword());
+        assertEquals(true, xmppEndpoint.isPubsub());
+        assertEquals(true, xmppEndpoint.isDoc());
+    }
+    
 }

Reply via email to