Updated Branches: refs/heads/master 7fe4af53a -> 053112878
CAMEL-7148 Added the ability to send messages to several participants with thanks to Denis Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/05311287 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/05311287 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/05311287 Branch: refs/heads/master Commit: 0531128785c5a76aae97308491a7bf619e1f8c2a Parents: f8b6d5b Author: Willem Jiang <willem.ji...@gmail.com> Authored: Thu Feb 13 20:38:24 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Thu Feb 13 20:38:42 2014 +0800 ---------------------------------------------------------------------- .../camel/component/xmpp/XmppComponent.java | 15 ++++++++-- .../component/xmpp/XmppPrivateChatProducer.java | 31 +++++++++++++------- ...outeMultipleProducersSingleConsumerTest.java | 1 + 3 files changed, 33 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/05311287/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java index 1f0656d..dc0e524 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java @@ -17,6 +17,7 @@ package org.apache.camel.component.xmpp; import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; @@ -38,9 +39,10 @@ public class XmppComponent extends DefaultComponent { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - if (endpointCache.containsKey(uri)) { + String cacheKey = extractCacheKeyFromUri(uri); + if (endpointCache.containsKey(cacheKey)) { LOG.debug("Using cached endpoint for URI {}", URISupport.sanitizeUri(uri)); - return endpointCache.get(uri); + return endpointCache.get(cacheKey); } LOG.debug("Creating new endpoint for URI {}", URISupport.sanitizeUri(uri)); @@ -64,7 +66,7 @@ public class XmppComponent extends DefaultComponent { } } - endpointCache.put(uri, endpoint); + endpointCache.put(cacheKey, endpoint); return endpoint; } @@ -75,4 +77,11 @@ public class XmppComponent extends DefaultComponent { endpointCache.clear(); } + private String extractCacheKeyFromUri(String uri) throws URISyntaxException { + System.out.println("URI " + uri); + URI u = new URI(uri); + String result = u.getScheme() + "://" + u.getHost() + u.getPort() + u.getQuery(); + System.out.println("Result " + result); + return result; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/05311287/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java index 4098ee4..33644d0 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java @@ -62,45 +62,54 @@ public class XmppPrivateChatProducer extends DefaultProducer { throw new RuntimeException("Could not connect to XMPP server.", e); } + String participant = exchange.getIn().getHeader(XmppConstants.TO, String.class); + String thread = endpoint.getChatId(); + if (participant == null) { + participant = getParticipant(); + } else { + thread = "Chat:" + participant + ":" + endpoint.getUser(); + } + ChatManager chatManager = connection.getChatManager(); - Chat chat = getOrCreateChat(chatManager); + Chat chat = getOrCreateChat(chatManager, participant, thread); Message message = null; try { message = new Message(); - message.setTo(getParticipant()); - message.setThread(endpoint.getChatId()); + + message.setTo(participant); + message.setThread(thread); message.setType(Message.Type.normal); endpoint.getBinding().populateXmppMessage(message, exchange); if (LOG.isDebugEnabled()) { - LOG.debug("Sending XMPP message to {} from {} : {}", new Object[]{endpoint.getParticipant(), endpoint.getUser(), message.getBody()}); + LOG.debug("Sending XMPP message to {} from {} : {}", new Object[]{participant, endpoint.getUser(), message.getBody()}); } chat.sendMessage(message); } catch (XMPPException xmppe) { - throw new RuntimeExchangeException("Could not send XMPP message: to " + endpoint.getParticipant() + " from " + endpoint.getUser() + " : " + message + throw new RuntimeExchangeException("Could not send XMPP message: to " + participant + " from " + endpoint.getUser() + " : " + message + " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, xmppe); } catch (Exception e) { - throw new RuntimeExchangeException("Could not send XMPP message to " + endpoint.getParticipant() + " from " + endpoint.getUser() + " : " + message + throw new RuntimeExchangeException("Could not send XMPP message to " + participant + " from " + endpoint.getUser() + " : " + message + " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, e); } } - private synchronized Chat getOrCreateChat(ChatManager chatManager) { + private synchronized Chat getOrCreateChat(ChatManager chatManager, final String participant, String thread) { if (LOG.isTraceEnabled()) { LOG.trace("Looking for existing chat instance with thread ID {}", endpoint.getChatId()); } - Chat chat = chatManager.getThreadChat(endpoint.getChatId()); + Chat chat = chatManager.getThreadChat(thread); if (chat == null) { if (LOG.isTraceEnabled()) { - LOG.trace("Creating new chat instance with thread ID {}", endpoint.getChatId()); + LOG.trace("Creating new chat instance with thread ID {}", thread); } - chat = chatManager.createChat(getParticipant(), endpoint.getChatId(), new MessageListener() { + chat = chatManager.createChat(participant, thread, new MessageListener() { public void processMessage(Chat chat, Message message) { // not here to do conversation if (LOG.isDebugEnabled()) { LOG.debug("Received and discarding message from {} : {}" - , getParticipant(), message.getBody()); + , participant, message.getBody()); } } }); http://git-wip-us.apache.org/repos/asf/camel/blob/05311287/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteMultipleProducersSingleConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteMultipleProducersSingleConsumerTest.java b/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteMultipleProducersSingleConsumerTest.java index f579898..7bf7da0 100644 --- a/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteMultipleProducersSingleConsumerTest.java +++ b/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteMultipleProducersSingleConsumerTest.java @@ -58,6 +58,7 @@ public class XmppRouteMultipleProducersSingleConsumerTest extends CamelTestSuppo .to(getProducer2Uri()); from(getConsumerUri()) + .removeHeader(XmppConstants.TO) .to(getConsumerUri()); from(getProducer1Uri())