Author: fhanik Date: Fri Mar 10 09:24:53 2006 New Revision: 384858 URL: http://svn.apache.org/viewcvs?rev=384858&view=rev Log: the map implementation is complete and ready to be tested
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384858&r1=384857&r2=384858&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Mar 10 09:24:53 2006 @@ -15,31 +15,30 @@ */ package org.apache.catalina.tribes.tipis; -import java.util.HashMap; -import java.util.Map; -import org.apache.catalina.tribes.Channel; -import java.io.Serializable; -import org.apache.catalina.tribes.Member; -import java.io.UnsupportedEncodingException; +import java.io.Externalizable; import java.io.IOException; -import org.apache.catalina.tribes.io.DirectByteArrayOutputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.io.ObjectOutputStream; -import org.apache.catalina.tribes.io.XByteBuffer; -import java.util.Set; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelListener; -import java.util.Collection; +import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; -import java.io.Externalizable; -import java.io.ObjectInput; -import java.io.ObjectOutput; +import org.apache.catalina.tribes.io.DirectByteArrayOutputStream; +import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.mcast.McastMember; -import java.util.Iterator; -import org.apache.catalina.tribes.ChannelException; -import java.util.LinkedList; -import java.util.LinkedHashSet; -import java.util.ArrayList; -import java.util.Arrays; /** * @todo implement periodic sync/transfer @@ -55,9 +54,10 @@ // INSTANCE VARIABLES //------------------------------------------------------------------------------ - private Channel channel; - private RpcChannel rpcChannel; - private byte[] mapContextName; + private transient Channel channel; + private transient RpcChannel rpcChannel; + private transient byte[] mapContextName; + private transient boolean stateTransferred = false; //------------------------------------------------------------------------------ @@ -109,13 +109,36 @@ this.rpcChannel = null; this.channel = null; super.clear(); + this.stateTransferred = false; } //------------------------------------------------------------------------------ // GROUP COM INTERFACES //------------------------------------------------------------------------------ public void transferState() { - throw new UnsupportedOperationException(); + try { + Member backup = channel.getMembers().length>0?channel.getMembers()[0]:null; + if ( backup != null ) { + MapMessage msg = new MapMessage(mapContextName,MapMessage.MSG_STATE,false, + null,null,null,null); + Response[] resp = rpcChannel.send(new Member[] {backup},msg,rpcChannel.FIRST_REPLY,TIME_OUT); + if ( resp.length > 0 ) { + msg = (MapMessage)resp[0].getMessage(); + ArrayList list = (ArrayList)msg.getValue(); + for (int i=0; i<list.size(); i++ ) { + MapMessage m = (MapMessage)list.get(i); + MapEntry entry = new MapEntry(m.getKey(),m.getValue()); + entry.setBackup(false); + entry.setProxy(true); + entry.setBackupNode(m.getBackupNode()); + super.put(entry.getKey(),entry); + } + } + } + } catch ( ChannelException x ) { + log.error("Unable to transfer LazyReplicatedMap state.",x); + } + stateTransferred = true; } /** @@ -126,12 +149,33 @@ public Serializable replyRequest(Serializable msg, Member sender) { if ( !(msg instanceof MapMessage) ) return null; MapMessage mapmsg = (MapMessage)msg; - if ( mapmsg.getMsgType() != mapmsg.MSG_RETRIEVE_BACKUP ) return null; - MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); - if ( entry == null ) return null; - mapmsg.setValue((Serializable)entry.getValue()); - return mapmsg; + //backup request + if ( mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP ) { + MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); + if (entry == null)return null; + mapmsg.setValue( (Serializable) entry.getValue()); + return mapmsg; + } + + //state transfer request + if ( mapmsg.getMsgType() == mapmsg.MSG_STATE ) { + ArrayList list = new ArrayList(); + Iterator i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = (Map.Entry) i.next(); + MapEntry entry = (MapEntry) e.getValue(); + MapMessage me = new MapMessage(mapContextName,MapMessage.MSG_PROXY, + false,(Serializable)entry.getKey(),(Serializable)entry.getValue(), + null,entry.getBackupNode()); + list.add(me); + } + mapmsg.setValue(list); + return mapmsg; + } + + return null; + } /** @@ -145,8 +189,45 @@ } public void messageReceived(Serializable msg, Member sender) { - throw new UnsupportedOperationException(); //todo implement all the messages that we can receive + //messages we can receive are MSG_PROXY, MSG_BACKUP + if ( !(msg instanceof MapMessage) ) return; + + MapMessage mapmsg = (MapMessage)msg; + if ( mapmsg.getMsgType() == MapMessage.MSG_PROXY ) { + MapEntry entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue()); + entry.setBackup(false); + entry.setProxy(true); + entry.setBackupNode(mapmsg.getBackupNode()); + super.put(entry.getKey(),entry); + } + + if ( mapmsg.getMsgType() == MapMessage.MSG_BACKUP ) { + MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); + if ( entry == null ) { + entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue()); + entry.setBackup(true); + entry.setProxy(false); + entry.setBackupNode(mapmsg.getBackupNode()); + super.put(entry.getKey(), entry); + } else { + if ( mapmsg.isDiff() ) { + if ( entry.getValue() instanceof Diffable ) { + Diffable diff = (Diffable)entry.getValue(); + try { + diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length); + }catch ( IOException x ) { + log.error("Unable to apply diff to key:"+entry.getKey(),x); + } + } else { + log.warn("Received a DIFF replication, but object["+entry.getValue()+"] does not implement Diffable"); + } + } else { + entry.setValue(mapmsg.getValue()); + } + } + } + } public boolean accept(Serializable msg, Member sender) { @@ -304,7 +385,17 @@ } public boolean containsValue(Object value) { - return super.containsValue(value); + if ( value == null ) { + return super.containsValue(value); + } else { + Iterator i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = (Map.Entry) i.next(); + MapEntry entry = (MapEntry) e.getValue(); + if (entry.isPrimary() && value.equals(entry.getValue())) return true; + }//while + return false; + }//end if } public Object clone() { @@ -440,11 +531,11 @@ } public int hashCode() { - return key.hashCode(); + return value.hashCode(); } public boolean equals(Object o) { - return key.equals(o); + return value.equals(o); } /** @@ -486,6 +577,7 @@ public static final int MSG_RETRIEVE_BACKUP = 2; public static final int MSG_PROXY = 3; public static final int MSG_REMOVE = 4; + public static final int MSG_STATE = 5; private byte[] mapId; private int msgtype; @@ -545,7 +637,8 @@ in.read(mapId); msgtype = in.readInt(); switch (msgtype) { - case MSG_BACKUP: { + case MSG_BACKUP: + case MSG_STATE: { diff = in.readBoolean(); key = (Serializable)in.readObject(); if ( diff ) { @@ -576,7 +669,8 @@ out.write(mapId); out.writeInt(msgtype); switch (msgtype) { - case MSG_BACKUP: { + case MSG_BACKUP: + case MSG_STATE: { out.writeBoolean(diff); out.writeObject(key); if ( diff ) { @@ -601,6 +695,10 @@ } }//switch }//writeExternal + + public Object clone() { + return new MapMessage(this.mapId,this.msgtype,this.diff,this.key,this.value,this.diffvalue,this.node); + } }//MapMessage //------------------------------------------------------------------------------ --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]