Author: fhanik Date: Mon May 8 15:53:32 2006 New Revision: 405211 URL: http://svn.apache.org/viewcvs?rev=405211&view=rev Log: Fixed backup code to continue until the data has been successfully backed up, or we have run out of members to backup the data to
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=405211&r1=405210&r2=405211&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Mon May 8 15:53:32 2006 @@ -111,7 +111,8 @@ } protected Member[] wrap(Member m) { - return new Member[] {m}; + if ( m == null ) return new Member[0]; + else return new Member[] {m}; } private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) { @@ -536,15 +537,22 @@ } //while } - int currentNode = 0; - public Member getNextBackupNode() { - Member[] members = getMapMembers(); - if (members.length == 0)return null; + protected int currentNode = 0; + public int getNextBackupIndex() { + int size = mapMembers.size(); + if (mapMembers.size() == 0)return -1; int node = currentNode++; - if (node >= members.length) { + if (node >= size) { node = 0; currentNode = 0; } + return node; + } + public Member getNextBackupNode() { + Member[] members = getMapMembers(); + int node = getNextBackupIndex(); + if ( members.length == 0 || node==-1) return null; + if ( node >= members.length ) node = 0; return members[node]; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=405211&r1=405210&r2=405211&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Mon May 8 15:53:32 2006 @@ -125,24 +125,53 @@ * @throws ChannelException */ protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { - //select a backup node - Member next = getNextBackupNode(); + Member[] members = getMapMembers(); + int firstIdx = getNextBackupIndex(); + int nextIdx = firstIdx; + Member[] backup = new Member[0]; - if ( next == null ) return null; + //there are no backups + if ( members.length == 0 || firstIdx == -1 ) return backup; - Member[] backup = wrap(next); - MapMessage msg = null; - //publish the data out to all nodes - Member[] proxies = excludeFromSet(backup,getMapMembers()); - if ( proxies.length > 0 ) { - msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, - (Serializable) key, null, null, backup); - getChannel().send(proxies, msg, getChannelSendOptions()); - } - //publish the backup data to one node - msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, - (Serializable) key, (Serializable) value, null, backup); - getChannel().send(backup, msg, getChannelSendOptions()); + boolean success = false; + do { + //select a backup node + Member next = members[firstIdx]; + + //increment for the next round of back up selection + nextIdx = firstIdx + 1; + if ( nextIdx >= members.length ) nextIdx = 0; + + if (next == null) { + continue; + } + MapMessage msg = null; + try { + backup = wrap(next); + //publish the backup data to one node + msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, + (Serializable) key, (Serializable) value, null, backup); + getChannel().send(backup, msg, getChannelSendOptions()); + //we published out to a backup, mark the test success + success = true; + }catch ( ChannelException x ) { + log.error("Unable to replicate backup key:"+key+" to backup:"+next+". Reason:"+x.getMessage(),x); + } + try { + //publish the data out to all nodes + Member[] proxies = excludeFromSet(backup, getMapMembers()); + if (success && proxies.length > 0 ) { + msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, + (Serializable) key, null, null, backup); + getChannel().send(proxies, msg, getChannelSendOptions()); + } + }catch ( ChannelException x ) { + //log the error, but proceed, this should only happen if a node went down, + //and if the node went down, then it can't receive the message, the others + //should still get it. + log.error("Unable to replicate proxy key:"+key+" to backup:"+next+". Reason:"+x.getMessage(),x); + } + } while ( !success && (firstIdx!=nextIdx)); return backup; } @@ -180,7 +209,10 @@ } else if ( entry.isProxy() ) { //invalidate the previous primary msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup); - getChannel().send(getMapMembersExcl(backup),msg,getChannelSendOptions()); + Member[] dest = getMapMembersExcl(backup); + if ( dest!=null && dest.length >0) { + getChannel().send(dest, msg, getChannelSendOptions()); + } } entry.setBackupNodes(backup); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]