Repository: camel Updated Branches: refs/heads/master f229d67fe -> bd666dbc0
#CAMEL-3195 Allow camel to send custom xmpp PubSub packet to a xmpp endpoint Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b1ff1aa Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b1ff1aa Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b1ff1aa Branch: refs/heads/master Commit: 1b1ff1aa515a2a6b964f6e98d85cec0e3dff171c Parents: f229d67 Author: Hugo Freire <hfre...@abajar.com> Authored: Thu Feb 6 15:59:14 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Dec 4 08:29:33 2014 +0100 ---------------------------------------------------------------------- .../camel/component/xmpp/XmppBinding.java | 62 ++++++++++++++++---- .../camel/component/xmpp/XmppConstants.java | 1 + .../camel/component/xmpp/XmppConsumer.java | 16 +++++ .../camel/component/xmpp/XmppEndpoint.java | 38 ++++++++++-- .../camel/component/xmpp/XmppMessage.java | 38 ++++++++---- .../component/xmpp/XmppPubSubProducer.java | 60 +++++++++++++++++++ .../component/xmpp/UriConfigurationTest.java | 15 +++++ 7 files changed, 204 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java index 58debd7..1f43527 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java @@ -25,6 +25,9 @@ import org.apache.camel.impl.DefaultHeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.ObjectHelper; import org.jivesoftware.smack.packet.Message; +import org.jivesoftware.smack.packet.Packet; +import org.jivesoftware.smack.packet.Presence; +import org.jivesoftware.smackx.pubsub.packet.PubSub; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,31 +89,68 @@ public class XmppBinding { message.setProperty("exchangeId", id); } } + + /** + * Populates the given XMPP packet from the inbound exchange + */ + public void populateXmppPacket(Packet packet, Exchange exchange) { + Set<Map.Entry<String, Object>> entries = exchange.getIn().getHeaders().entrySet(); + for (Map.Entry<String, Object> entry : entries) { + String name = entry.getKey(); + Object value = entry.getValue(); + if (!headerFilterStrategy.applyFilterToCamelHeaders(name, value, exchange)) { + try { + packet.setProperty(name, value); + LOG.debug("Added property name: " + name + " value: " + value.toString()); + } catch (IllegalArgumentException iae) { + LOG.debug("Not adding property " + name + " to XMPP message due to " + iae); + } + } + } + String id = exchange.getExchangeId(); + if (id != null) { + packet.setProperty("exchangeId", id); + } + } + /** * Extracts the body from the XMPP message */ - public Object extractBodyFromXmpp(Exchange exchange, Message message) { - return message.getBody(); + public Object extractBodyFromXmpp(Exchange exchange, Packet xmppPacket) { + return (xmppPacket instanceof Message)? GetMessageBody((Message)xmppPacket): xmppPacket; + } + + private Object GetMessageBody(Message message) { + String messageBody = message.getBody(); + if(messageBody == null) //probably a pubsub message + return message; + return messageBody; } - public Map<String, Object> extractHeadersFromXmpp(Message xmppMessage, Exchange exchange) { + public Map<String, Object> extractHeadersFromXmpp(Packet xmppPacket, Exchange exchange) { Map<String, Object> answer = new HashMap<String, Object>(); - for (String name : xmppMessage.getPropertyNames()) { - Object value = xmppMessage.getProperty(name); + for (String name : xmppPacket.getPropertyNames()) { + Object value = xmppPacket.getProperty(name); if (!headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) { answer.put(name, value); } } - answer.put(XmppConstants.MESSAGE_TYPE, xmppMessage.getType()); - answer.put(XmppConstants.SUBJECT, xmppMessage.getSubject()); - answer.put(XmppConstants.THREAD_ID, xmppMessage.getThread()); - answer.put(XmppConstants.FROM, xmppMessage.getFrom()); - answer.put(XmppConstants.PACKET_ID, xmppMessage.getPacketID()); - answer.put(XmppConstants.TO, xmppMessage.getTo()); + if(xmppPacket instanceof Message) { + Message xmppMessage = (Message)xmppPacket; + answer.put(XmppConstants.MESSAGE_TYPE, xmppMessage.getType()); + answer.put(XmppConstants.SUBJECT, xmppMessage.getSubject()); + answer.put(XmppConstants.THREAD_ID, xmppMessage.getThread()); + } else if(xmppPacket instanceof PubSub) { + PubSub pubsubPacket = (PubSub)xmppPacket; + answer.put(XmppConstants.MESSAGE_TYPE, pubsubPacket.getType()); + } + answer.put(XmppConstants.FROM, xmppPacket.getFrom()); + answer.put(XmppConstants.PACKET_ID, xmppPacket.getPacketID()); + answer.put(XmppConstants.TO, xmppPacket.getTo()); return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java index 68649ea..1251e9c 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java @@ -28,4 +28,5 @@ public interface XmppConstants { String FROM = "CamelXmppFrom"; String PACKET_ID = "CamelXmppPacketID"; String TO = "CamelXmppTo"; + String docHeader = "doc"; } http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java index 657c504..ef57fda 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java @@ -31,9 +31,12 @@ import org.jivesoftware.smack.SmackConfiguration; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.filter.AndFilter; +import org.jivesoftware.smack.filter.MessageTypeFilter; +import org.jivesoftware.smack.filter.OrFilter; import org.jivesoftware.smack.filter.PacketTypeFilter; import org.jivesoftware.smack.filter.ToContainsFilter; import org.jivesoftware.smack.packet.Message; +import org.jivesoftware.smack.packet.Message.Type; import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.Presence; import org.jivesoftware.smackx.muc.DiscussionHistory; @@ -79,6 +82,14 @@ public class XmppConsumer extends DefaultConsumer implements PacketListener, Mes chatManager = connection.getChatManager(); chatManager.addChatListener(this); + + OrFilter pubsubPacketFilter = new OrFilter(); + if(endpoint.isPubsub()){ + //xep-0060: pubsub#notification_type can be 'headline' or 'normal' + pubsubPacketFilter.addFilter(new MessageTypeFilter(Type.headline)); + pubsubPacketFilter.addFilter(new MessageTypeFilter(Type.normal)); + connection.addPacketListener(this, pubsubPacketFilter); + } if (endpoint.getRoom() == null) { privateChat = chatManager.getThreadChat(endpoint.getChatId()); @@ -209,6 +220,10 @@ public class XmppConsumer extends DefaultConsumer implements PacketListener, Mes } Exchange exchange = endpoint.createExchange(message); + + if(endpoint.isDoc() == true) { + exchange.getIn().setHeader(XmppConstants.docHeader, message); + } try { getProcessor().process(exchange); } catch (Exception e) { @@ -222,4 +237,5 @@ public class XmppConsumer extends DefaultConsumer implements PacketListener, Mes } } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java index 8c3f853..a20799b 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java @@ -62,6 +62,10 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg private String nickname; private String serviceName; private XMPPConnection connection; + private boolean pubsub = false; + //Set a doc header on the IN message containing a Document form of the incoming packet; + //default is true if pubsub is true, otherwise false + private boolean doc = false; private boolean testConnectionOnStartup = true; private int connectionPollDelay = 10; @@ -81,6 +85,9 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg if (room != null) { return createGroupChatProducer(); } else { + if(isPubsub() == true) { + return createPubSubProducer(); + } if (getParticipant() == null) { throw new IllegalArgumentException("No room or participant configured on this endpoint: " + this); } @@ -95,6 +102,10 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg public Producer createPrivateChatProducer(String participant) throws Exception { return new XmppPrivateChatProducer(this, participant); } + + public Producer createPubSubProducer() throws Exception { + return new XmppPubSubProducer(this); + } public Consumer createConsumer(Processor processor) throws Exception { XmppConsumer answer = new XmppConsumer(this, processor); @@ -107,14 +118,14 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg return createExchange(pattern, null); } - public Exchange createExchange(Message message) { - return createExchange(getExchangePattern(), message); + public Exchange createExchange(Packet packet) { + return createExchange(getExchangePattern(), packet); } - private Exchange createExchange(ExchangePattern pattern, Message message) { + private Exchange createExchange(ExchangePattern pattern, Packet packet) { Exchange exchange = new DefaultExchange(this, getExchangePattern()); exchange.setProperty(Exchange.BINDING, getBinding()); - exchange.setIn(new XmppMessage(message)); + exchange.setIn(new XmppMessage(packet)); return exchange; } @@ -369,6 +380,25 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg this.connectionPollDelay = connectionPollDelay; } + public void setPubsub(boolean pubsub) { + this.pubsub = pubsub; + if(pubsub == true) { + setDoc(true); + } + } + + public boolean isPubsub() { + return pubsub; + } + + public void setDoc(boolean doc) { + this.doc = doc; + } + + public boolean isDoc() { + return doc; + } + // Implementation methods // ------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java index 016f1fd..b75243c 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.camel.impl.DefaultMessage; import org.apache.camel.util.ExchangeHelper; import org.jivesoftware.smack.packet.Message; +import org.jivesoftware.smack.packet.Packet; /** * Represents a {@link org.apache.camel.Message} for working with XMPP @@ -28,20 +29,24 @@ import org.jivesoftware.smack.packet.Message; * @version */ public class XmppMessage extends DefaultMessage { - private Message xmppMessage; + private Packet xmppPacket; public XmppMessage() { this(new Message()); } public XmppMessage(Message jmsMessage) { - this.xmppMessage = jmsMessage; + this.xmppPacket = jmsMessage; } - + + public XmppMessage(Packet jmsMessage) { + this.xmppPacket = jmsMessage; + } + @Override public String toString() { - if (xmppMessage != null) { - return "XmppMessage: " + xmppMessage; + if (xmppPacket != null) { + return "XmppMessage: " + xmppPacket; } else { return "XmppMessage: " + getBody(); } @@ -51,11 +56,22 @@ public class XmppMessage extends DefaultMessage { * Returns the underlying XMPP message */ public Message getXmppMessage() { - return xmppMessage; + return (xmppPacket instanceof Message) ? (Message)xmppPacket : null; } public void setXmppMessage(Message xmppMessage) { - this.xmppMessage = xmppMessage; + this.xmppPacket = xmppMessage; + } + + /** + * Returns the underlying XMPP packet + */ + public Packet getXmppPacket() { + return xmppPacket; + } + + public void setXmppPacket(Packet xmppPacket) { + this.xmppPacket = xmppPacket; } @Override @@ -65,10 +81,10 @@ public class XmppMessage extends DefaultMessage { @Override protected Object createBody() { - if (xmppMessage != null) { + if (xmppPacket != null) { XmppBinding binding = ExchangeHelper.getBinding(getExchange(), XmppBinding.class); if (binding != null) { - return binding.extractBodyFromXmpp(getExchange(), xmppMessage); + return (getHeader(XmppConstants.docHeader) == null) ? binding.extractBodyFromXmpp(getExchange(), xmppPacket): getHeader(XmppConstants.docHeader); } } return null; @@ -76,10 +92,10 @@ public class XmppMessage extends DefaultMessage { @Override protected void populateInitialHeaders(Map<String, Object> map) { - if (xmppMessage != null) { + if (xmppPacket != null) { XmppBinding binding = ExchangeHelper.getBinding(getExchange(), XmppBinding.class); if (binding != null) { - map.putAll(binding.extractHeadersFromXmpp(xmppMessage, getExchange())); + map.putAll(binding.extractHeadersFromXmpp(xmppPacket, getExchange())); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java new file mode 100644 index 0000000..20d46a4 --- /dev/null +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java @@ -0,0 +1,60 @@ +package org.apache.camel.component.xmpp; + +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.impl.DefaultProducer; +import org.jivesoftware.smack.XMPPConnection; +import org.jivesoftware.smack.XMPPException; +import org.jivesoftware.smackx.pubsub.packet.PubSub; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class XmppPubSubProducer extends DefaultProducer { + private static final transient Logger LOG = LoggerFactory.getLogger(XmppPrivateChatProducer.class); + private final XmppEndpoint endpoint; + private XMPPConnection connection; + + public XmppPubSubProducer(XmppEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + LOG.debug("Creating XmppPresenceProducer"); + } + + public void process(Exchange exchange) throws Exception { + try { + if (connection == null) { + connection = endpoint.createConnection(); + } + + // make sure we are connected + if (!connection.isConnected()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reconnecting to: " + XmppEndpoint.getConnectionMessage(connection)); + } + connection.connect(); + } + } catch (XMPPException e) { + throw new RuntimeExchangeException("Cannot connect to XMPP Server: " + + ((connection != null) ? XmppEndpoint.getConnectionMessage(connection): endpoint.getHost()), exchange, e); + } + + try { + Object body = exchange.getIn().getBody(Object.class); + if(body instanceof PubSub) { + PubSub pubsubpacket = (PubSub) body; + endpoint.getBinding().populateXmppPacket(pubsubpacket, exchange); + exchange.getIn().setHeader(XmppConstants.docHeader, pubsubpacket); + connection.sendPacket(pubsubpacket); + } else { + throw new Exception("Message does not contain a pubsub packet"); + } + } catch (XMPPException xmppe) { + throw new RuntimeExchangeException("Cannot send XMPP pubsub: from " + endpoint.getUser() + + " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, xmppe); + } catch (Exception e) { + throw new RuntimeExchangeException("Cannot send XMPP pubsub: from " + endpoint.getUser() + + " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java b/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java index abb9699..39ef33f 100644 --- a/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java +++ b/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java @@ -69,4 +69,19 @@ public class UriConfigurationTest extends Assert { assertEquals("Camel", xmppEndpoint.getResource()); } + + @Test + public void testPubSubConfiguration() throws Exception { + Endpoint endpoint = context.getEndpoint("xmpp://camel-user@localhost:123?password=secret&pubsub=true"); + assertTrue("Endpoint not an XmppEndpoint: " + endpoint, endpoint instanceof XmppEndpoint); + XmppEndpoint xmppEndpoint = (XmppEndpoint) endpoint; + + assertEquals("localhost", xmppEndpoint.getHost()); + assertEquals(123, xmppEndpoint.getPort()); + assertEquals("camel-user", xmppEndpoint.getUser()); + assertEquals("secret", xmppEndpoint.getPassword()); + assertEquals(true, xmppEndpoint.isPubsub()); + assertEquals(true, xmppEndpoint.isDoc()); + } + }