Author: azeez Date: Tue Jun 22 18:53:35 2010 New Revision: 956974 URL: http://svn.apache.org/viewvc?rev=956974&view=rev Log: Improvements as suggested in https://issues.apache.org/jira/browse/AXIS2-4749
Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=956974&r1=956973&r2=956974&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Tue Jun 22 18:53:35 2010 @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,8 +36,8 @@ import java.util.Map; public class AtMostOnceInterceptor extends ChannelInterceptorBase { private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class); - private static final Map<ChannelMessage, Long> receivedMessages = - new HashMap<ChannelMessage, Long>(); + private static final Map<MessageId, Long> receivedMessages = + new HashMap<MessageId, Long>(); /** * The time a message lives in the receivedMessages Map @@ -52,8 +53,9 @@ public class AtMostOnceInterceptor exten public void messageReceived(ChannelMessage msg) { if (okToProcess(msg.getOptions())) { synchronized (receivedMessages) { - if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it - receivedMessages.put(msg, System.currentTimeMillis()); + MessageId msgId = new MessageId(msg.getUniqueId()); + if (receivedMessages.get(msgId) == null) { // If it is a new message, keep track of it + receivedMessages.put(msgId, System.currentTimeMillis()); super.messageReceived(msg); } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived log.info("Duplicate message received from " + TribesUtil.getName(msg.getAddress())); @@ -64,7 +66,7 @@ public class AtMostOnceInterceptor exten } } - private class MessageCleanupTask implements Runnable { + private static class MessageCleanupTask implements Runnable { public void run() { while (true) { // This task should never terminate @@ -74,20 +76,20 @@ public class AtMostOnceInterceptor exten e.printStackTrace(); } try { - List<ChannelMessage> toBeRemoved = new ArrayList<ChannelMessage>(); + List<MessageId> toBeRemoved = new ArrayList<MessageId>(); Thread.yield(); synchronized (receivedMessages) { - for (ChannelMessage msg : receivedMessages.keySet()) { - long arrivalTime = receivedMessages.get(msg); + for (MessageId msgId : receivedMessages.keySet()) { + long arrivalTime = receivedMessages.get(msgId); if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) { - toBeRemoved.add(msg); + toBeRemoved.add(msgId); if (toBeRemoved.size() > 10000) { // Do not allow this thread to run for too long break; } } } - for (ChannelMessage msg : toBeRemoved) { - receivedMessages.remove(msg); + for (MessageId msgId : toBeRemoved) { + receivedMessages.remove(msgId); if (log.isDebugEnabled()) { log.debug("Cleaned up message "); } @@ -99,4 +101,38 @@ public class AtMostOnceInterceptor exten } } } + + /** + * Represents a Message ID + */ + private static class MessageId { + private byte[] id; + + private MessageId(byte[] id) { + this.id = id; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MessageId messageId = (MessageId) o; + + if (!Arrays.equals(id, messageId.id)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return Arrays.hashCode(id); + } + } }