Author: fhanik Date: Sun Mar 12 07:39:15 2006 New Revision: 385300 URL: http://svn.apache.org/viewcvs?rev=385300&view=rev Log: The replicated map implements membership logic on a per map basis. so that you can have multiple maps in a cluster.
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=385300&r1=385299&r2=385300&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Sun Mar 12 07:39:15 2006 @@ -89,6 +89,7 @@ private transient byte[] mapContextName; private transient boolean stateTransferred = false; private transient Object stateMutex = new Object(); + private transient ArrayList mapMembers = new ArrayList(); //------------------------------------------------------------------------------ @@ -122,6 +123,19 @@ this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); this.channel.addChannelListener(this); this.channel.addMembershipListener(this); + + try { + MapMessage msg = new MapMessage(this.mapContextName,MapMessage.MSG_START, + false,null,null,null,channel.getLocalMember()); + Response[] resp = rpcChannel.send(channel.getMembers(),msg,rpcChannel.FIRST_REPLY,timeout); + for ( int i=0; i<resp.length; i++ ) { + messageReceived(resp[i].getMessage(),resp[i].getSource()); + } + }catch ( ChannelException x ) { + log.warn("Unable to send stop message."); + } + + transferState(); } @@ -130,6 +144,14 @@ } public void finalize() { + try { + MapMessage msg = new MapMessage(this.mapContextName,MapMessage.MSG_STOP, + false,null,null,null,channel.getLocalMember()); + if ( channel!=null) channel.send(channel.getMembers(),msg); + }catch ( ChannelException x ) { + log.warn("Unable to send stop message.",x); + } + if ( this.rpcChannel!=null ) { this.rpcChannel.breakDown(); } @@ -139,6 +161,7 @@ } this.rpcChannel = null; this.channel = null; + this.mapMembers.clear(); super.clear(); this.stateTransferred = false; } @@ -205,7 +228,7 @@ //------------------------------------------------------------------------------ public void transferState() { try { - Member backup = channel.getMembers().length>0?channel.getMembers()[0]:null; + Member backup = mapMembers.size()>0?(Member)mapMembers.get(0):null; if ( backup != null ) { MapMessage msg = new MapMessage(mapContextName,MapMessage.MSG_STATE,false, null,null,null,null); @@ -244,6 +267,13 @@ if ( !(msg instanceof MapMessage) ) return null; MapMessage mapmsg = (MapMessage)msg; + //map start request + if ( mapmsg.getMsgType() == mapmsg.MSG_START ) { + mapMemberAdded(sender); + mapmsg.setBackUpNode(channel.getLocalMember()); + return mapmsg; + } + //backup request if ( mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP ) { MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); @@ -281,7 +311,13 @@ * @param sender Member */ public void leftOver(Serializable msg, Member sender) { - //ignore left over responses + //left over membership messages + if ( !(msg instanceof MapMessage) ) return; + + MapMessage mapmsg = (MapMessage)msg; + if ( mapmsg.getMsgType() == MapMessage.MSG_START ) { + mapMemberAdded(mapmsg.getBackupNode()); + } } public void messageReceived(Serializable msg, Member sender) { @@ -290,6 +326,15 @@ if ( !(msg instanceof MapMessage) ) return; MapMessage mapmsg = (MapMessage)msg; + + if ( mapmsg.getMsgType() == MapMessage.MSG_START ) { + mapMemberAdded(mapmsg.getBackupNode()); + } + + if ( mapmsg.getMsgType() == MapMessage.MSG_STOP ) { + memberDisappeared(mapmsg.getBackupNode()); + } + if ( mapmsg.getMsgType() == MapMessage.MSG_PROXY ) { MapEntry entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue()); entry.setBackup(false); @@ -341,8 +386,9 @@ return false; } - public void memberAdded(Member member) { + public void mapMemberAdded(Member member) { //select a backup node if we don't have one + mapMembers.add(member); synchronized (stateMutex) { Iterator i = super.entrySet().iterator(); while (i.hasNext()) { @@ -360,7 +406,12 @@ }//synchronized } + public void memberAdded(Member member) { + //do nothing + } + public void memberDisappeared(Member member) { + mapMembers.remove(member); //todo move all sessions that are primary here to and have this member as //a backup Iterator i = super.entrySet().iterator(); @@ -380,14 +431,13 @@ int currentNode = 0; public Member getNextBackupNode() { - Member[] members = channel.getMembers(); - if ( members.length == 0 ) return null; + if ( mapMembers.size() == 0 ) return null; int node = currentNode++; - if ( node >= members.length ) { + if ( node >= mapMembers.size() ) { node = 0; currentNode = 0; } - return members[node]; + return (Member)mapMembers.get(node); } @@ -411,7 +461,7 @@ //publish the data out to all nodes MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_PROXY, false, (Serializable) key, null, null, backup); - channel.send(channel.getMembers(), msg); + channel.send((Member[])mapMembers.toArray(new Member[mapMembers.size()]), msg); //publish the backup data to one node msg = new MapMessage(this.mapContextName, MapMessage.MSG_BACKUP, false, @@ -469,13 +519,14 @@ Object old = null; //make sure that any old values get removed - if ( containsKey(key) ) old = (MapEntry)remove(key); + if ( containsKey(key) ) old = remove(key); try { Member backup = publishEntryInfo(key, value); entry.setBackupNode(backup); } catch (ChannelException x) { log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x); } + System.out.println("adding key="+key+" entry="+entry); super.put(key,entry); return old; } @@ -719,6 +770,8 @@ public static final int MSG_PROXY = 3; public static final int MSG_REMOVE = 4; public static final int MSG_STATE = 5; + public static final int MSG_START = 6; + public static final int MSG_STOP = 7; private byte[] mapId; private int msgtype; @@ -767,6 +820,10 @@ return node; } + private void setBackUpNode(Member node) { + this.node = node; + } + public byte[] getMapId() { return mapId; } @@ -811,6 +868,14 @@ if ( d.length > 0 ) node = McastMember.getMember(d); break; } + case MSG_START : + MSG_STOP :{ + byte[] d = new byte[in.readInt()]; + in.read(d); + if ( d.length > 0 ) node = McastMember.getMember(d); + break; + } + }//switch }//readExternal @@ -845,6 +910,13 @@ } case MSG_PROXY: { out.writeObject(key); + byte[] d = node!=null?((McastMember)node).getData(false):new byte[0]; + out.writeInt(d.length); + out.write(d); + break; + } + case MSG_START: + MSG_STOP : { byte[] d = node!=null?((McastMember)node).getData(false):new byte[0]; out.writeInt(d.length); out.write(d); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=385300&r1=385299&r2=385300&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Sun Mar 12 07:39:15 2006 @@ -97,6 +97,7 @@ public void messageReceived(Serializable msg, Member sender) { RpcMessage rmsg = (RpcMessage)msg; +System.out.println("Received RPC message with message:"+rmsg.message); RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid); if ( rmsg.reply ) { RpcCollector collector = (RpcCollector)responseMap.get(key); Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=385300&r1=385299&r2=385300&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Sun Mar 12 07:39:15 2006 @@ -65,7 +65,6 @@ } public void messageReceived(Serializable msg, Member source) { - System.out.println("Recieved: "+msg); table.dataModel.getValueAt(-1,-1); } @@ -135,7 +134,7 @@ public static class SimpleTableDemo extends JPanel implements ActionListener{ - private static int WIDTH = 500; + private static int WIDTH = 550; private LazyReplicatedMap map; private boolean DEBUG = false; @@ -217,7 +216,7 @@ //create a add value button JPanel addpanel = new JPanel(); - addpanel.setPreferredSize(new Dimension(WIDTH,20)); + addpanel.setPreferredSize(new Dimension(WIDTH,30)); addpanel.add(createButton("Add","add")); addpanel.add(txtAddKey); addpanel.add(txtAddValue); @@ -225,7 +224,7 @@ //create a remove value button JPanel removepanel = new JPanel( ); - removepanel.setPreferredSize(new Dimension(WIDTH,20)); + removepanel.setPreferredSize(new Dimension(WIDTH,30)); removepanel.add(createButton("Remove","remove")); removepanel.add(txtRemoveKey); @@ -236,7 +235,7 @@ changepanel.add(createButton("Change","change")); changepanel.add(txtChangeKey); changepanel.add(txtChangeValue); - changepanel.setPreferredSize(new Dimension(WIDTH,20)); + changepanel.setPreferredSize(new Dimension(WIDTH,30)); add(changepanel); @@ -244,7 +243,7 @@ JPanel syncpanel = new JPanel( ); syncpanel.add(createButton("Synchronize","sync")); syncpanel.add(createButton("Replicate","replicate")); - syncpanel.setPreferredSize(new Dimension(WIDTH,20)); + syncpanel.setPreferredSize(new Dimension(WIDTH,30)); add(syncpanel); @@ -323,6 +322,7 @@ //Display the window. frame.setSize(450,250); + newContentPane.setSize(450,300); frame.pack(); frame.setVisible(true); return newContentPane; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]