This is an automated email from the ASF dual-hosted git repository. ralaoui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-vysper.git
The following commit(s) were added to refs/heads/master by this push: new 8915a59 Remove LocalDeliveryUtils in favor of StanzaBroker 8915a59 is described below commit 8915a59097a6fb3cb970cd008324ec530c4963ae Author: Réda Housni Alaoui <reda.housniala...@gmail.com> AuthorDate: Sun Sep 1 17:11:40 2019 +0200 Remove LocalDeliveryUtils in favor of StanzaBroker --- .../vysper/xmpp/delivery/LocalDeliveryUtils.java | 58 ---------------- .../vysper/xmpp/delivery/StanzaReceiverRelay.java | 29 +++++--- .../delivery/failure/IgnoreFailureStrategy.java | 6 +- .../ReturnErrorToSenderFailureStrategy.java | 6 +- .../AbstractPresenceSpecializedHandler.java | 2 +- .../im/handler/PresenceAvailabilityHandler.java | 4 +- .../im/handler/PresenceSubscriptionHandler.java | 77 ++++++++++++++-------- .../modules/roster/handler/RosterIQHandler.java | 30 +++++---- .../vysper/xmpp/server/ServerRuntimeContext.java | 11 ++-- .../xmpp/delivery/StanzaRelayBrokerTestCase.java | 3 +- ...ReturnErrorToSenderFailureStrategyTestCase.java | 6 +- ...eliveringInteralInboundStanzaRelayTestCase.java | 8 +-- .../im/handler/PresenceHandlerBaseTestCase.java | 3 +- .../PresenceSubRequestOutHandlerTestCase.java | 15 +++-- .../modules/extension/xep0045_muc/MUCModule.java | 2 +- .../xep0045_muc/handler/MUCIqAdminHandler.java | 2 +- .../xep0045_muc/handler/MUCMessageHandler.java | 2 +- .../xep0045_muc/handler/MUCPresenceHandler.java | 2 +- .../SubscriberPayloadNotificationVisitor.java | 2 +- 19 files changed, 122 insertions(+), 146 deletions(-) diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/delivery/LocalDeliveryUtils.java b/server/core/src/main/java/org/apache/vysper/xmpp/delivery/LocalDeliveryUtils.java deleted file mode 100644 index bdff427..0000000 --- a/server/core/src/main/java/org/apache/vysper/xmpp/delivery/LocalDeliveryUtils.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.vysper.xmpp.delivery; - -import org.apache.vysper.xmpp.server.StanzaReceivingSessionContext; -import org.apache.vysper.xmpp.stanza.Stanza; -import org.apache.vysper.xmpp.state.resourcebinding.ResourceRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * @author The Apache MINA Project (d...@mina.apache.org) - */ -public class LocalDeliveryUtils { - - private static final Logger logger = LoggerFactory.getLogger(LocalDeliveryUtils.class); - - /** - * delivers a stanza to a server-local resource. used for sending a stanza to - * all resources of the same user. - * - * @param registry - * registry to look up session by resource ID - * @param resource - * receiving resource ID - * @param push - * stanza to be pushed - */ - public static void relayToResourceDirectly(ResourceRegistry registry, String resource, Stanza push) { - try { - StanzaReceivingSessionContext targetContext = registry.getSessionContext(resource); - if (targetContext == null) - return; - targetContext.getResponseWriter().write(push); - } catch (RuntimeException e) { - logger.warn("failed to directly relay stanza to resource " + resource, e); - } - } - -} diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/delivery/StanzaReceiverRelay.java b/server/core/src/main/java/org/apache/vysper/xmpp/delivery/StanzaReceiverRelay.java index 0b89ef9..3af179c 100644 --- a/server/core/src/main/java/org/apache/vysper/xmpp/delivery/StanzaReceiverRelay.java +++ b/server/core/src/main/java/org/apache/vysper/xmpp/delivery/StanzaReceiverRelay.java @@ -19,8 +19,11 @@ */ package org.apache.vysper.xmpp.delivery; +import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.vysper.xmpp.addressing.Entity; @@ -38,7 +41,7 @@ import org.apache.vysper.xmpp.stanza.Stanza; */ public class StanzaReceiverRelay implements StanzaRelay { - private final Map<Entity, StanzaReceiver> receiverMap = new HashMap<>(); + private final Map<Entity, StanzaReceiverQueue> receiverMap = new HashMap<>(); private boolean exploitFailureStrategy = true; @@ -56,12 +59,12 @@ public class StanzaReceiverRelay implements StanzaRelay { /** * add new receiver */ - public void add(Entity receiverID, StanzaReceiver receiver) { + public void add(Entity receiverID, StanzaReceiverQueue receiver) { receiverMap.put(receiverID, receiver); } public void relay(StanzaReceivingSessionContext sessionContext, Entity receiver, Stanza stanza, - DeliveryFailureStrategy deliveryFailureStrategy) throws DeliveryException { + DeliveryFailureStrategy deliveryFailureStrategy) throws DeliveryException { if (!isRelaying()) { throw new ServiceNotAvailableException("relay is not relaying"); } @@ -82,6 +85,15 @@ public class StanzaReceiverRelay implements StanzaRelay { countDelivered++; receiverMap.get(receiver).deliver(stanza); } + + public Stanza nextStanza(){ + return receiverMap.values() + .stream() + .map(StanzaReceiverQueue::getNext) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + } public boolean isRelaying() { return acceptingMode.get(); @@ -108,13 +120,10 @@ public class StanzaReceiverRelay implements StanzaRelay { */ public void resetAll() { synchronized (receiverMap) { - for (StanzaReceiver receiver : receiverMap.values()) { - if (receiver instanceof StanzaReceiverQueue) { - StanzaReceiverQueue stanzaReceiverQueue = (StanzaReceiverQueue) receiver; - // emptying by retrieving all stanzas from the queue - while (stanzaReceiverQueue.getNext() != null) { - // continue - } + for (StanzaReceiverQueue receiver : receiverMap.values()) { + // emptying by retrieving all stanzas from the queue + while (receiver.getNext() != null) { + // continue } } diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java b/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java index 6b7eaef..fd87cc7 100644 --- a/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java +++ b/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java @@ -29,7 +29,11 @@ import org.apache.vysper.xmpp.stanza.Stanza; */ public class IgnoreFailureStrategy implements DeliveryFailureStrategy { - public final static IgnoreFailureStrategy IGNORE_FAILURE_STRATEGY = new IgnoreFailureStrategy(); + public static final IgnoreFailureStrategy INSTANCE = IgnoreFailureStrategy.INSTANCE; + + private IgnoreFailureStrategy() { + + } public void process(Stanza failedToDeliverStanza, List<DeliveryException> deliveryException) throws DeliveryException { diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java b/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java index 88a7610..145d108 100644 --- a/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java +++ b/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java @@ -82,7 +82,7 @@ public class ReturnErrorToSenderFailureStrategy implements DeliveryFailureStrate if (deliveryExceptions == null) { XMPPCoreStanza error = XMPPCoreStanza.getWrapper(ServerErrorResponses.getStanzaError(stanzaErrorCondition, failedCoreStanza, errorType, "stanza could not be delivered", "en", null)); - stanzaBroker.write(error.getTo(), error, IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY); + stanzaBroker.write(error.getTo(), error, IgnoreFailureStrategy.INSTANCE); } else if (deliveryExceptions.size() == 1) { DeliveryException deliveryException = deliveryExceptions.get(0); if (deliveryException instanceof LocalRecipientOfflineException) { @@ -110,7 +110,7 @@ public class ReturnErrorToSenderFailureStrategy implements DeliveryFailureStrate StanzaBuilder builder = StanzaBuilder.createPresenceStanza(from, to, null, UNSUBSCRIBED, null, null); final Stanza finalStanza = builder.build(); - stanzaBroker.write(to, finalStanza, IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY); + stanzaBroker.write(to, finalStanza, IgnoreFailureStrategy.INSTANCE); return; } } @@ -121,7 +121,7 @@ public class ReturnErrorToSenderFailureStrategy implements DeliveryFailureStrate ServerErrorResponses.getStanzaError(smartDeliveryException.getStanzaErrorCondition(), failedCoreStanza, smartDeliveryException.getStanzaErrorType(), smartDeliveryException.getErrorText(), "en", null)); - stanzaBroker.write(error.getTo(), error, IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY); + stanzaBroker.write(error.getTo(), error, IgnoreFailureStrategy.INSTANCE); } } else if (deliveryExceptions.size() > 1) { throw new RuntimeException("cannot return to sender for multiple failed deliveries"); diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/AbstractPresenceSpecializedHandler.java b/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/AbstractPresenceSpecializedHandler.java index 2605967..2b3d05e 100644 --- a/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/AbstractPresenceSpecializedHandler.java +++ b/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/AbstractPresenceSpecializedHandler.java @@ -58,7 +58,7 @@ public abstract class AbstractPresenceSpecializedHandler { protected void relayStanza(Entity receiver, Stanza stanza, StanzaBroker stanzaBroker) { try { - stanzaBroker.write(receiver, stanza, new IgnoreFailureStrategy()); + stanzaBroker.write(receiver, stanza, IgnoreFailureStrategy.INSTANCE); } catch (DeliveryException e) { logger.warn("presence relaying failed ", e); } diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceAvailabilityHandler.java b/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceAvailabilityHandler.java index 38b34c5..477e5ff 100644 --- a/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceAvailabilityHandler.java +++ b/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceAvailabilityHandler.java @@ -143,7 +143,7 @@ public class PresenceAvailabilityHandler extends AbstractPresenceSpecializedHand } } - protected Stanza handleInboundPresenceError(PresenceStanza stanza, ServerRuntimeContext serverRuntimeContext, + private Stanza handleInboundPresenceError(PresenceStanza stanza, ServerRuntimeContext serverRuntimeContext, SessionContext sessionContext, ResourceRegistry registry) { return stanza; // send to client } @@ -364,7 +364,7 @@ public class PresenceAvailabilityHandler extends AbstractPresenceSpecializedHand } try { - stanzaBroker.write(to, redirectDirectedStanza, new IgnoreFailureStrategy()); + stanzaBroker.write(to, redirectDirectedStanza, IgnoreFailureStrategy.INSTANCE); } catch (DeliveryException e) { logger.warn("relaying directed presence failed. from = " + from + ", to = " + to); } diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceSubscriptionHandler.java b/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceSubscriptionHandler.java index 2a2d39c..1be141b 100644 --- a/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceSubscriptionHandler.java +++ b/server/core/src/main/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceSubscriptionHandler.java @@ -37,7 +37,6 @@ import org.apache.vysper.compliance.SpecCompliance; import org.apache.vysper.compliance.SpecCompliant; import org.apache.vysper.xmpp.addressing.Entity; import org.apache.vysper.xmpp.addressing.EntityImpl; -import org.apache.vysper.xmpp.delivery.LocalDeliveryUtils; import org.apache.vysper.xmpp.delivery.failure.DeliveryException; import org.apache.vysper.xmpp.delivery.failure.IgnoreFailureStrategy; import org.apache.vysper.xmpp.modules.roster.RosterException; @@ -66,7 +65,7 @@ import org.slf4j.LoggerFactory; */ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHandler { - final Logger logger = LoggerFactory.getLogger(PresenceSubscriptionHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(PresenceSubscriptionHandler.class); @Override /* package */Stanza executeCorePresence(ServerRuntimeContext serverRuntimeContext, boolean isOutboundStanza, @@ -109,8 +108,7 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand case SUBSCRIBE: // RFC3921bis-04#3.1.2 // user requests subsription to contact - handleOutboundSubscriptionRequest(stampedStanza, serverRuntimeContext, sessionContext, registry, - rosterManager, stanzaBroker); + handleOutboundSubscriptionRequest(stampedStanza, sessionContext, registry, rosterManager, stanzaBroker); break; case SUBSCRIBED: @@ -149,21 +147,21 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand case SUBSCRIBED: // RFC3921bis-04#3.1.6 // contact approves user's subsription request - return handleInboundSubscriptionApproval(presenceStanza, serverRuntimeContext, sessionContext, registry, - rosterManager); + return handleInboundSubscriptionApproval(presenceStanza, sessionContext, registry, rosterManager, + stanzaBroker); case UNSUBSCRIBE: // RFC3921bis-04#3.3.3 // contact unsubscribes handleInboundUnsubscription(presenceStanza, serverRuntimeContext, sessionContext, registry, - rosterManager); + rosterManager, stanzaBroker); return null; case UNSUBSCRIBED: // RFC3921bis-04#3.2.3 // contact denies subsription - handleInboundSubscriptionCancellation(presenceStanza, serverRuntimeContext, sessionContext, registry, - rosterManager); + handleInboundSubscriptionCancellation(presenceStanza, sessionContext, registry, rosterManager, + stanzaBroker); return null; default: @@ -178,7 +176,8 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand @SpecCompliant(spec = "RFC3921bis-05", section = "3.3.3", status = IN_PROGRESS, comment = "current impl based hereupon"), @SpecCompliant(spec = "RFC3921bis-08", section = "3.3.3", status = NOT_STARTED, comment = "substantial additions from bis-05 not yet taken into account") }) protected void handleInboundUnsubscription(PresenceStanza stanza, ServerRuntimeContext serverRuntimeContext, - SessionContext sessionContext, ResourceRegistry registry, RosterManager rosterManager) { + SessionContext sessionContext, ResourceRegistry registry, RosterManager rosterManager, + StanzaBroker stanzaBroker) { Entity contact = stanza.getFrom(); Entity user = stanza.getTo(); @@ -211,7 +210,12 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand Entity userResource = new EntityImpl(user, resource); Stanza push = RosterStanzaUtils.createRosterItemPushIQ(userResource, sessionContext.nextSequenceValue(), rosterItem); - LocalDeliveryUtils.relayToResourceDirectly(registry, resource, push); + + try { + stanzaBroker.write(userResource, push, IgnoreFailureStrategy.INSTANCE); + } catch (DeliveryException e) { + LOG.error(e.getMessage(), e); + } } } @@ -249,30 +253,33 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand relayStanza(contact, stanza, stanzaBroker); - sendRosterUpdate(sessionContext, registry, user, rosterItem); + sendRosterUpdate(sessionContext, registry, user, rosterItem, stanzaBroker); } /** * send roster push to all of the user's interested resources */ protected void sendRosterUpdate(SessionContext sessionContext, ResourceRegistry registry, Entity user, - RosterItem rosterItem) { + RosterItem rosterItem, StanzaBroker stanzaBroker) { List<String> resources = registry.getInterestedResources(user); for (String resource : resources) { Entity userResource = new EntityImpl(user, resource); Stanza push = RosterStanzaUtils.createRosterItemPushIQ(userResource, sessionContext.nextSequenceValue(), rosterItem); - LocalDeliveryUtils.relayToResourceDirectly(registry, resource, push); + try { + stanzaBroker.write(userResource, push, IgnoreFailureStrategy.INSTANCE); + } catch (DeliveryException e) { + LOG.error(e.getMessage(), e); + } } } @SpecCompliance(compliant = { @SpecCompliant(spec = "RFC3921bis-05", section = "3.2.3", status = IN_PROGRESS, comment = "current impl based hereupon"), @SpecCompliant(spec = "RFC3921bis-08", section = "3.2.3", status = NOT_STARTED, comment = "additions from bis-05 not yet taken into account") }) - protected void handleInboundSubscriptionCancellation(PresenceStanza stanza, - ServerRuntimeContext serverRuntimeContext, SessionContext sessionContext, ResourceRegistry registry, - RosterManager rosterManager) { + protected void handleInboundSubscriptionCancellation(PresenceStanza stanza, SessionContext sessionContext, + ResourceRegistry registry, RosterManager rosterManager, StanzaBroker stanzaBroker) { Entity contact = stanza.getFrom(); Entity user = stanza.getTo(); @@ -306,7 +313,11 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand Entity userResource = new EntityImpl(user, resource); Stanza push = RosterStanzaUtils.createRosterItemPushIQ(userResource, sessionContext.nextSequenceValue(), rosterItem); - LocalDeliveryUtils.relayToResourceDirectly(registry, resource, push); + try { + stanzaBroker.write(userResource, push, IgnoreFailureStrategy.INSTANCE); + } catch (DeliveryException e) { + LOG.error(e.getMessage(), e); + } } } @@ -342,7 +353,7 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand relayStanza(contact, stanza, stanzaBroker); // send roster push to all of the user's interested resources - sendRosterUpdate(sessionContext, registry, user, rosterItem); + sendRosterUpdate(sessionContext, registry, user, rosterItem, stanzaBroker); } @SpecCompliance(compliant = { @@ -375,7 +386,7 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand relayStanza(contact, stanza, stanzaBroker); // send roster push to all of the user's interested resources - sendRosterUpdate(sessionContext, registry, user, rosterItem); + sendRosterUpdate(sessionContext, registry, user, rosterItem, stanzaBroker); // send presence information from user's available resource to the // contact @@ -411,8 +422,8 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand @SpecCompliance(compliant = { @SpecCompliant(spec = "RFC3921bis-05", section = "3.1.6", status = IN_PROGRESS, comment = "current impl based hereupon"), @SpecCompliant(spec = "RFC3921bis-08", section = "3.1.6", status = NOT_STARTED, comment = "minor rephrasing from bis-05 not yet taken into account") }) - protected Stanza handleInboundSubscriptionApproval(PresenceStanza stanza, ServerRuntimeContext serverRuntimeContext, - SessionContext sessionContext, ResourceRegistry registry, RosterManager rosterManager) { + protected Stanza handleInboundSubscriptionApproval(PresenceStanza stanza, SessionContext sessionContext, + ResourceRegistry registry, RosterManager rosterManager, StanzaBroker stanzaBroker) { Entity contact = stanza.getFrom(); Entity user = stanza.getTo(); @@ -442,7 +453,11 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand Entity userResource = new EntityImpl(user, resource); Stanza push = RosterStanzaUtils.createRosterItemPushIQ(userResource, sessionContext.nextSequenceValue(), rosterItem); - LocalDeliveryUtils.relayToResourceDirectly(registry, resource, push); + try { + stanzaBroker.write(userResource, push, IgnoreFailureStrategy.INSTANCE); + } catch (DeliveryException e) { + LOG.error(e.getMessage(), e); + } } } else { // silently drop the stanza @@ -495,16 +510,15 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand @SpecCompliance(compliant = { @SpecCompliant(spec = "RFC3921bis-05", section = "3.1.2", status = IN_PROGRESS, comment = "current impl based hereupon"), @SpecCompliant(spec = "RFC3921bis-08", section = "3.1.2", status = NOT_STARTED, comment = "major rephrasing from bis-05 not yet taken into account") }) - protected void handleOutboundSubscriptionRequest(PresenceStanza stanza, ServerRuntimeContext serverRuntimeContext, - SessionContext sessionContext, ResourceRegistry registry, RosterManager rosterManager, - StanzaBroker stanzaBroker) { + private void handleOutboundSubscriptionRequest(PresenceStanza stanza, SessionContext sessionContext, + ResourceRegistry registry, RosterManager rosterManager, StanzaBroker stanzaBroker) { Entity user = stanza.getFrom(); Entity contact = stanza.getTo().getBareJID(); // TODO schedule a observer which can re-send the request - RosterItem rosterItem = null; + RosterItem rosterItem; try { rosterItem = getExistingOrNewRosterItem(rosterManager, user.getBareJID(), contact); @@ -521,7 +535,7 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand // relay the stanza to the contact (via the contact's server) try { - stanzaBroker.write(stanza.getTo(), stanza, new IgnoreFailureStrategy()); + stanzaBroker.write(stanza.getTo(), stanza, IgnoreFailureStrategy.INSTANCE); } catch (DeliveryException e) { e.printStackTrace(); } @@ -532,7 +546,12 @@ public class PresenceSubscriptionHandler extends AbstractPresenceSpecializedHand Entity userResource = new EntityImpl(user, resource); Stanza push = RosterStanzaUtils.createRosterItemPushIQ(userResource, sessionContext.nextSequenceValue(), rosterItem); - LocalDeliveryUtils.relayToResourceDirectly(registry, resource, push); + + try { + stanzaBroker.write(userResource, push, IgnoreFailureStrategy.INSTANCE); + } catch (DeliveryException e) { + LOG.error(e.getMessage(), e); + } } } diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/modules/roster/handler/RosterIQHandler.java b/server/core/src/main/java/org/apache/vysper/xmpp/modules/roster/handler/RosterIQHandler.java index a46770b..bbf5f1f 100644 --- a/server/core/src/main/java/org/apache/vysper/xmpp/modules/roster/handler/RosterIQHandler.java +++ b/server/core/src/main/java/org/apache/vysper/xmpp/modules/roster/handler/RosterIQHandler.java @@ -33,7 +33,6 @@ import org.apache.vysper.compliance.SpecCompliance; import org.apache.vysper.compliance.SpecCompliant; import org.apache.vysper.xmpp.addressing.Entity; import org.apache.vysper.xmpp.addressing.EntityImpl; -import org.apache.vysper.xmpp.delivery.LocalDeliveryUtils; import org.apache.vysper.xmpp.delivery.failure.DeliveryException; import org.apache.vysper.xmpp.delivery.failure.IgnoreFailureStrategy; import org.apache.vysper.xmpp.modules.core.base.handler.DefaultIQHandler; @@ -70,7 +69,7 @@ import org.slf4j.LoggerFactory; @SpecCompliant(spec = "rfc3921bis-08", section = "2", status = IN_PROGRESS, coverage = COMPLETE) public class RosterIQHandler extends DefaultIQHandler { - final Logger logger = LoggerFactory.getLogger(RosterIQHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(RosterIQHandler.class); @Override protected boolean verifyNamespace(Stanza stanza) { @@ -184,10 +183,10 @@ public class RosterIQHandler extends DefaultIQHandler { if (setRosterItem.getName() != null) { existingItem.setName(setRosterItem.getName()); - logger.debug(user.getBareJID() + " roster: set roster item name to " + setRosterItem.getName()); + LOG.debug(user.getBareJID() + " roster: set roster item name to " + setRosterItem.getName()); } existingItem.setGroups(setRosterItem.getGroups()); - logger.debug(user.getBareJID() + " roster: roster item groups set to " + setRosterItem.getGroups()); + LOG.debug(user.getBareJID() + " roster: roster item groups set to " + setRosterItem.getGroups()); try { // update contact persistently @@ -198,7 +197,7 @@ public class RosterIQHandler extends DefaultIQHandler { null)); } - pushRosterItemToInterestedResources(sessionContext, user, existingItem); + pushRosterItemToInterestedResources(sessionContext, user, existingItem, stanzaBroker); return Collections.singletonList( RosterStanzaUtils.createRosterItemIQ(user, stanza.getID(), IQStanzaType.RESULT, existingItem)); @@ -231,36 +230,41 @@ public class RosterIQHandler extends DefaultIQHandler { if (unsubscribedStanza != null) { try { - stanzaBroker.write(contactJid, unsubscribedStanza, new IgnoreFailureStrategy()); + stanzaBroker.write(contactJid, unsubscribedStanza, IgnoreFailureStrategy.INSTANCE); } catch (DeliveryException e) { - logger.warn("failure sending unsubscribed on roster remove", e); + LOG.warn("failure sending unsubscribed on roster remove", e); } } if (unsubscribeStanza != null) { try { - stanzaBroker.write(contactJid, unsubscribeStanza, new IgnoreFailureStrategy()); + stanzaBroker.write(contactJid, unsubscribeStanza, IgnoreFailureStrategy.INSTANCE); } catch (DeliveryException e) { - logger.warn("failure sending unsubscribe on roster remove", e); + LOG.warn("failure sending unsubscribe on roster remove", e); } } // send roster item push to all interested resources - pushRosterItemToInterestedResources(sessionContext, user, new RosterItem(contactJid, REMOVE)); + pushRosterItemToInterestedResources(sessionContext, user, new RosterItem(contactJid, REMOVE), stanzaBroker); // return success return Collections .singletonList(StanzaBuilder.createIQStanza(null, user, IQStanzaType.RESULT, stanza.getID()).build()); } - private void pushRosterItemToInterestedResources(SessionContext sessionContext, Entity user, - RosterItem rosterItem) { + private void pushRosterItemToInterestedResources(SessionContext sessionContext, Entity user, RosterItem rosterItem, + StanzaBroker stanzaBroker) { ResourceRegistry registry = sessionContext.getServerRuntimeContext().getResourceRegistry(); List<String> resources = registry.getInterestedResources(user.getBareJID()); for (String resource : resources) { Entity userResource = new EntityImpl(user, resource); Stanza push = RosterStanzaUtils.createRosterItemPushIQ(userResource, sessionContext.nextSequenceValue(), rosterItem); - LocalDeliveryUtils.relayToResourceDirectly(registry, resource, push); + + try { + stanzaBroker.write(userResource, push, IgnoreFailureStrategy.INSTANCE); + } catch (DeliveryException e) { + LOG.error(e.getMessage(), e); + } } } diff --git a/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerRuntimeContext.java b/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerRuntimeContext.java index d73149a..5d69892 100644 --- a/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerRuntimeContext.java +++ b/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerRuntimeContext.java @@ -28,13 +28,10 @@ import org.apache.vysper.event.EventBus; import org.apache.vysper.storage.StorageProvider; import org.apache.vysper.xmpp.addressing.Entity; import org.apache.vysper.xmpp.authentication.UserAuthentication; -import org.apache.vysper.xmpp.delivery.StanzaRelay; import org.apache.vysper.xmpp.modules.Module; import org.apache.vysper.xmpp.modules.ServerRuntimeContextService; import org.apache.vysper.xmpp.protocol.StanzaHandler; -import org.apache.vysper.xmpp.protocol.StanzaProcessor; import org.apache.vysper.xmpp.server.components.Component; -import org.apache.vysper.xmpp.server.components.ComponentStanzaProcessor; import org.apache.vysper.xmpp.server.s2s.XMPPServerConnectorRegistry; import org.apache.vysper.xmpp.stanza.Stanza; import org.apache.vysper.xmpp.state.presence.LatestPresenceCache; @@ -73,14 +70,14 @@ public interface ServerRuntimeContext { void registerComponent(Component component); boolean hasComponentStanzaProcessor(Entity entity); - + XMPPServerConnectorRegistry getServerConnectorRegistry(); - + List<Module> getModules(); <T> T getModule(Class<T> clazz); - + void addModule(Module module); - + EventBus getEventBus(); } diff --git a/server/core/src/test/java/org/apache/vysper/xmpp/delivery/StanzaRelayBrokerTestCase.java b/server/core/src/test/java/org/apache/vysper/xmpp/delivery/StanzaRelayBrokerTestCase.java index ce9b904..1afb3b9 100644 --- a/server/core/src/test/java/org/apache/vysper/xmpp/delivery/StanzaRelayBrokerTestCase.java +++ b/server/core/src/test/java/org/apache/vysper/xmpp/delivery/StanzaRelayBrokerTestCase.java @@ -28,7 +28,6 @@ import org.apache.vysper.xmpp.delivery.failure.IgnoreFailureStrategy; import org.apache.vysper.xmpp.delivery.failure.ServiceNotAvailableException; import org.apache.vysper.xmpp.server.ServerFeatures; import org.apache.vysper.xmpp.server.ServerRuntimeContext; -import org.apache.vysper.xmpp.server.SessionContext; import org.apache.vysper.xmpp.server.StanzaReceivingSessionContext; import org.apache.vysper.xmpp.stanza.Stanza; import org.apache.vysper.xmpp.stanza.StanzaBuilder; @@ -50,7 +49,7 @@ public class StanzaRelayBrokerTestCase extends Mockito { private static final String LANG = "en"; private static final String BODY = "Hello world"; - private DeliveryFailureStrategy failureStrategy = IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY; + private DeliveryFailureStrategy failureStrategy = IgnoreFailureStrategy.INSTANCE; private StanzaRelay internalRelay = mock(StanzaRelay.class); private StanzaRelay externalRelay = mock(StanzaRelay.class); diff --git a/server/core/src/test/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategyTestCase.java b/server/core/src/test/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategyTestCase.java index 1972540..18ad439 100644 --- a/server/core/src/test/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategyTestCase.java +++ b/server/core/src/test/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategyTestCase.java @@ -70,7 +70,7 @@ public class ReturnErrorToSenderFailureStrategyTestCase extends Mockito { .build(); ArgumentCaptor<Stanza> stanzaCaptor = ArgumentCaptor.forClass(Stanza.class); - verify(stanzaBroker).write(eq(FROM), stanzaCaptor.capture(), eq(IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY)); + verify(stanzaBroker).write(eq(FROM), stanzaCaptor.capture(), eq(IgnoreFailureStrategy.INSTANCE)); StanzaAssert.assertEquals(expected, stanzaCaptor.getValue()); } @@ -91,7 +91,7 @@ public class ReturnErrorToSenderFailureStrategyTestCase extends Mockito { .build(); ArgumentCaptor<Stanza> stanzaCaptor = ArgumentCaptor.forClass(Stanza.class); - verify(stanzaBroker).write(eq(FROM), stanzaCaptor.capture(), eq(IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY)); + verify(stanzaBroker).write(eq(FROM), stanzaCaptor.capture(), eq(IgnoreFailureStrategy.INSTANCE)); StanzaAssert.assertEquals(expected, stanzaCaptor.getValue()); } @@ -106,7 +106,7 @@ public class ReturnErrorToSenderFailureStrategyTestCase extends Mockito { Stanza expected = StanzaBuilder.createPresenceStanza(TO, FROM, null, UNSUBSCRIBED, null, null).build(); ArgumentCaptor<Stanza> stanzaCaptor = ArgumentCaptor.forClass(Stanza.class); - verify(stanzaBroker).write(eq(FROM), stanzaCaptor.capture(), eq(IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY)); + verify(stanzaBroker).write(eq(FROM), stanzaCaptor.capture(), eq(IgnoreFailureStrategy.INSTANCE)); StanzaAssert.assertEquals(expected, stanzaCaptor.getValue()); } diff --git a/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInteralInboundStanzaRelayTestCase.java b/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInteralInboundStanzaRelayTestCase.java index b70fc76..ed63a6d 100644 --- a/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInteralInboundStanzaRelayTestCase.java +++ b/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInteralInboundStanzaRelayTestCase.java @@ -105,7 +105,7 @@ public class DeliveringInteralInboundStanzaRelayTestCase extends TestCase { Stanza stanza = StanzaBuilder.createMessageStanza(FROM_ENTITY, TO_ENTITY, "en", "Hello").build(); try { - stanzaRelay.relay(sessionContext, TO_ENTITY, stanza, new IgnoreFailureStrategy()); + stanzaRelay.relay(sessionContext, TO_ENTITY, stanza, IgnoreFailureStrategy.INSTANCE); Stanza recordedStanza = sessionContext.getNextRecordedResponse(1000); assertNotNull("stanza delivered", recordedStanza); assertEquals("Hello", recordedStanza.getSingleInnerElementsNamed("body").getSingleInnerText().getText()); @@ -123,7 +123,7 @@ public class DeliveringInteralInboundStanzaRelayTestCase extends TestCase { Stanza stanza = StanzaBuilder.createMessageStanza(FROM_ENTITY, TO_ENTITY, "en", "Hello").build(); try { - stanzaRelay.relay(sessionContext, TO_ENTITY, stanza, new IgnoreFailureStrategy()); + stanzaRelay.relay(sessionContext, TO_ENTITY, stanza, IgnoreFailureStrategy.INSTANCE); Stanza recordedStanza = sessionContext.getNextRecordedResponse(1000); assertNull("stanza not delivered to unbound", recordedStanza); } catch (DeliveryException e) { @@ -151,7 +151,7 @@ public class DeliveringInteralInboundStanzaRelayTestCase extends TestCase { Stanza stanza = StanzaBuilder.createMessageStanza(FROM_ENTITY, TO_ENTITY, "en", "Hello").build(); try { - stanzaRelay.relay(sessionContext, TO_ENTITY, stanza, new IgnoreFailureStrategy()); + stanzaRelay.relay(sessionContext, TO_ENTITY, stanza, IgnoreFailureStrategy.INSTANCE); Stanza recordedStanza_1 = sessionContextTO_ENTITY_1_prio3.getNextRecordedResponse(100); assertNotNull("stanza 1 delivered", recordedStanza_1); Stanza recordedStanza_2 = sessionContextTO_ENTITY_2_prio0.getNextRecordedResponse(100); @@ -185,7 +185,7 @@ public class DeliveringInteralInboundStanzaRelayTestCase extends TestCase { Stanza stanza = StanzaBuilder.createMessageStanza(FROM_ENTITY, TO_ENTITY, "en", "Hello").build(); try { - stanzaRelay.relay(sessionContext, TO_ENTITY, stanza, new IgnoreFailureStrategy()); + stanzaRelay.relay(sessionContext, TO_ENTITY, stanza, IgnoreFailureStrategy.INSTANCE); Stanza recordedStanza_1 = sessionContextTO_ENTITY_1_prio3.getNextRecordedResponse(100); assertNotNull("stanza 1 delivered", recordedStanza_1); Stanza recordedStanza_2 = sessionContextTO_ENTITY_2_prio0.getNextRecordedResponse(100); diff --git a/server/core/src/test/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceHandlerBaseTestCase.java b/server/core/src/test/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceHandlerBaseTestCase.java index 71740b8..a1a4c46 100644 --- a/server/core/src/test/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceHandlerBaseTestCase.java +++ b/server/core/src/test/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceHandlerBaseTestCase.java @@ -209,6 +209,7 @@ abstract public class PresenceHandlerBaseTestCase extends TestCase { * @return NULL, if no stanza available matching the user's resource id */ protected Stanza getNextDirectResponseFor(TestUser testUser) { - return sessionContext.getNextRecordedResponseForResource(testUser.getBoundResourceId()); + return testUser.getNextStanza(); +// return sessionContext.getNextRecordedResponseForResource(testUser.getBoundResourceId()); } } diff --git a/server/core/src/test/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceSubRequestOutHandlerTestCase.java b/server/core/src/test/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceSubRequestOutHandlerTestCase.java index 9a65778..8d8153e 100644 --- a/server/core/src/test/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceSubRequestOutHandlerTestCase.java +++ b/server/core/src/test/java/org/apache/vysper/xmpp/modules/core/im/handler/PresenceSubRequestOutHandlerTestCase.java @@ -26,6 +26,7 @@ import static org.apache.vysper.xmpp.modules.roster.SubscriptionType.NONE; import org.apache.vysper.xmpp.addressing.EntityFormatException; import org.apache.vysper.xmpp.addressing.EntityImpl; +import org.apache.vysper.xmpp.delivery.StanzaReceiverRelay; import org.apache.vysper.xmpp.protocol.SimpleStanzaBroker; import org.apache.vysper.xmpp.stanza.PresenceStanzaType; import org.apache.vysper.xmpp.stanza.Stanza; @@ -50,16 +51,18 @@ public class PresenceSubRequestOutHandlerTestCase extends PresenceHandlerBaseTes XMPPCoreStanza requestApproval = XMPPCoreStanza .getWrapper(StanzaBuilder.createPresenceStanza(unrelatedUser.getEntityFQ(), initiatingUser.getEntity(), null, PresenceStanzaType.SUBSCRIBED, null, null).build()); - handler.executeCore(requestApproval, sessionContext.getServerRuntimeContext(), false, sessionContext, null); + handler.executeCore(requestApproval, sessionContext.getServerRuntimeContext(), false, sessionContext, + new SimpleStanzaBroker(sessionContext.getStanzaRelay(), sessionContext)); // 3 roster pushes but... + StanzaReceiverRelay relay = (StanzaReceiverRelay) sessionContext.getStanzaRelay(); for (int i = 1; i <= 3; i++) { - Stanza stanza = sessionContext.getNextRecordedResponse(); + Stanza stanza = relay.nextStanza(); assertEquals("iq", stanza.getName()); } // ... BUT no subscription approval (presence) or anything additional to the // roster pushes - assertNull(sessionContext.getNextRecordedResponse()); + assertNull(relay.nextStanza()); resetRecordedStanzas(); } @@ -77,8 +80,7 @@ public class PresenceSubRequestOutHandlerTestCase extends PresenceHandlerBaseTes assertEquals(ResourceState.AVAILABLE_INTERESTED, getResourceState()); // 1 to TO + 3 roster pushes - assertStanzasDeliveredAndRelayed(1); - assertStanzasReceivedDirectly(3); + assertStanzasDeliveredAndRelayed(4); // roster push for 1 interested initiator of _same_ session Stanza initiatorNotification = getNextDirectResponseFor(initiatingUser); @@ -128,8 +130,7 @@ public class PresenceSubRequestOutHandlerTestCase extends PresenceHandlerBaseTes assertEquals(ResourceState.AVAILABLE_INTERESTED, getResourceState()); // 1 to TO + 3 roster pushes - assertStanzasDeliveredAndRelayed(1); - assertStanzasReceivedDirectly(3); + assertStanzasDeliveredAndRelayed(4); // roster push for 1 interested initiator... Stanza initiatorNotification = getNextDirectResponseFor(initiatingUser); diff --git a/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/MUCModule.java b/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/MUCModule.java index 572e61a..0bd9e82 100644 --- a/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/MUCModule.java +++ b/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/MUCModule.java @@ -227,7 +227,7 @@ public class MUCModule extends DefaultDiscoAwareModule } try { - stanzaBroker.write(receiver, builder.build(), new IgnoreFailureStrategy()); + stanzaBroker.write(receiver, builder.build(), IgnoreFailureStrategy.INSTANCE); } catch (DeliveryException e) { // ignore } diff --git a/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCIqAdminHandler.java b/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCIqAdminHandler.java index 171aa44..73b6530 100644 --- a/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCIqAdminHandler.java +++ b/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCIqAdminHandler.java @@ -398,7 +398,7 @@ public class MUCIqAdminHandler extends DefaultIQHandler { protected void relayStanza(Entity receiver, Stanza stanza, StanzaBroker stanzaBroker) { try { - stanzaBroker.write(receiver, stanza, new IgnoreFailureStrategy()); + stanzaBroker.write(receiver, stanza, IgnoreFailureStrategy.INSTANCE); } catch (DeliveryException e) { logger.warn("presence relaying failed ", e); } diff --git a/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCMessageHandler.java b/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCMessageHandler.java index caf507f..8be0f26 100644 --- a/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCMessageHandler.java +++ b/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCMessageHandler.java @@ -309,7 +309,7 @@ public class MUCMessageHandler extends DefaultMessageHandler { protected void relayStanza(Entity receiver, Stanza stanza, StanzaBroker stanzaBroker) { try { - stanzaBroker.write(receiver, stanza, new IgnoreFailureStrategy()); + stanzaBroker.write(receiver, stanza, IgnoreFailureStrategy.INSTANCE); } catch (DeliveryException e) { logger.warn("presence relaying failed ", e); } diff --git a/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCPresenceHandler.java b/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCPresenceHandler.java index 0fed2cc..f8c28d7 100644 --- a/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCPresenceHandler.java +++ b/server/extensions/xep0045-muc/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0045_muc/handler/MUCPresenceHandler.java @@ -503,7 +503,7 @@ public class MUCPresenceHandler extends DefaultPresenceHandler { protected void relayStanza(Entity receiver, Stanza stanza, ServerRuntimeContext serverRuntimeContext, StanzaBroker stanzaBroker) { try { - stanzaBroker.write(receiver, stanza, new IgnoreFailureStrategy()); + stanzaBroker.write(receiver, stanza, IgnoreFailureStrategy.INSTANCE); } catch (DeliveryException e) { logger.warn("presence relaying failed ", e); } diff --git a/server/extensions/xep0060-pubsub/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0060_pubsub/SubscriberPayloadNotificationVisitor.java b/server/extensions/xep0060-pubsub/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0060_pubsub/SubscriberPayloadNotificationVisitor.java index 7f8c680..7788bdc 100644 --- a/server/extensions/xep0060-pubsub/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0060_pubsub/SubscriberPayloadNotificationVisitor.java +++ b/server/extensions/xep0060-pubsub/src/main/java/org/apache/vysper/xmpp/modules/extension/xep0060_pubsub/SubscriberPayloadNotificationVisitor.java @@ -43,7 +43,7 @@ public class SubscriberPayloadNotificationVisitor implements SubscriberVisitor { final Logger logger = LoggerFactory.getLogger(SubscriberPayloadNotificationVisitor.class); // Ignore all failures during the delivery (fire and forget) - private DeliveryFailureStrategy dfs = new IgnoreFailureStrategy(); + private DeliveryFailureStrategy dfs = IgnoreFailureStrategy.INSTANCE; // The StanzaBroker we use to send the messages private final StanzaBroker stanzaBroker;