Author: fhanik Date: Wed Mar 15 11:16:58 2006 New Revision: 386140 URL: http://svn.apache.org/viewcvs?rev=386140&view=rev Log: Refactored member to be memberimpl, as it is not specific to multicast
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java - copied, changed from r385742, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Removed: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=386140&r1=386139&r2=386140&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java Wed Mar 15 11:16:58 2006 @@ -21,7 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectOutput; import java.io.ObjectOutputStream; -import org.apache.catalina.tribes.mcast.McastMember; +import org.apache.catalina.tribes.mcast.MemberImpl; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import org.apache.catalina.tribes.util.UUIDGenerator; @@ -148,7 +148,7 @@ * @return byte[] */ public byte[] getDataPackage() { - byte[] addr = ((McastMember)address).getData(false); + byte[] addr = ((MemberImpl)address).getData(false); int length = 4 + //options 8 + //timestamp off=4 @@ -193,7 +193,7 @@ byte[] addr = new byte[XByteBuffer.toInt(b,offset)]; offset += 4; //addr length System.arraycopy(b,offset,addr,0,addr.length); - data.setAddress(McastMember.getMember(addr)); + data.setAddress(MemberImpl.getMember(addr)); offset += addr.length; //addr data data.message = new XByteBuffer(new byte[XByteBuffer.toInt(b,offset)],false); offset += 4; //message length Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java?rev=386140&r1=386139&r2=386140&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java Wed Mar 15 11:16:58 2006 @@ -37,13 +37,13 @@ */ public class McastMembership { - protected static final McastMember[] EMPTY_MEMBERS = new McastMember[0]; + protected static final MemberImpl[] EMPTY_MEMBERS = new MemberImpl[0]; /** * The name of this membership, has to be the same as the name for the local * member */ - protected McastMember local; + protected MemberImpl local; /** * A map of all the members in the cluster. @@ -53,7 +53,7 @@ /** * A list of all the members in the cluster. */ - protected McastMember[] members = EMPTY_MEMBERS; + protected MemberImpl[] members = EMPTY_MEMBERS; /** * sort members by alive time @@ -64,7 +64,7 @@ * Constructs a new membership * @param name - has to be the name of the local member. Used to filter the local member from the cluster membership */ - public McastMembership(McastMember local) { + public McastMembership(MemberImpl local) { this.local = local; } @@ -84,7 +84,7 @@ * @return - true if this member is new to the cluster, false otherwise. * @return - false if this member is the local member or updated. */ - public synchronized boolean memberAlive(McastMember member) { + public synchronized boolean memberAlive(MemberImpl member) { boolean result = false; //ignore ourselves if ( member.equals(local) ) return result; @@ -98,7 +98,7 @@ result = true; } else { //update the member alive time - McastMember updateMember = entry.getMember() ; + MemberImpl updateMember = entry.getMember() ; if(updateMember.getMemberAliveTime() != member.getMemberAliveTime()) { updateMember.setMemberAliveTime(member.getMemberAliveTime()); Arrays.sort(members, memberComparator); @@ -113,10 +113,10 @@ * Add a member to this component and sort array with memberComparator * @param member The member to add */ - protected void addMcastMember(McastMember member) { + protected void addMcastMember(MemberImpl member) { synchronized (members) { - McastMember results[] = - new McastMember[members.length + 1]; + MemberImpl results[] = + new MemberImpl[members.length + 1]; for (int i = 0; i < members.length; i++) results[i] = members[i]; results[members.length] = member; @@ -130,7 +130,7 @@ * * @param member The member to remove */ - protected void removeMcastMember(McastMember member) { + protected void removeMcastMember(MemberImpl member) { synchronized (members) { int n = -1; for (int i = 0; i < members.length; i++) { @@ -141,8 +141,8 @@ } if (n < 0) return; - McastMember results[] = - new McastMember[members.length - 1]; + MemberImpl results[] = + new MemberImpl[members.length - 1]; int j = 0; for (int i = 0; i < members.length; i++) { if (i != n) @@ -159,7 +159,7 @@ * @param maxtime - the max time a member can remain unannounced before it is considered dead. * @return the list of expired members */ - public synchronized McastMember[] expire(long maxtime) { + public synchronized MemberImpl[] expire(long maxtime) { if(!hasMembers() ) return EMPTY_MEMBERS; @@ -175,7 +175,7 @@ } if(list != null) { - McastMember[] result = new McastMember[list.size()]; + MemberImpl[] result = new MemberImpl[list.size()]; list.toArray(result); for( int j=0; j<result.length; j++) { map.remove(result[j]); @@ -195,9 +195,9 @@ } - public synchronized McastMember getMember(Member mbr) { + public synchronized MemberImpl getMember(Member mbr) { if(hasMembers()) { - McastMember result = null; + MemberImpl result = null; for ( int i=0; i<this.members.length && result==null; i++ ) { if ( members[i].equals(mbr) ) result = members[i]; }//for @@ -211,7 +211,7 @@ * Returning a list of all the members in the membership * We not need a copy: add and remove generate new arrays. */ - public synchronized McastMember[] getMembers() { + public synchronized MemberImpl[] getMembers() { if(hasMembers()) { return members; } else { @@ -238,13 +238,13 @@ public int compare(Object o1, Object o2) { try { - return compare((McastMember) o1, (McastMember) o2); + return compare((MemberImpl) o1, (MemberImpl) o2); } catch (ClassCastException x) { return 0; } } - public int compare(McastMember m1, McastMember m2) { + public int compare(MemberImpl m1, MemberImpl m2) { //longer alive time, means sort first long result = m2.getMemberAliveTime() - m1.getMemberAliveTime(); if (result < 0) @@ -262,10 +262,10 @@ protected static class MbrEntry { - protected McastMember mbr; + protected MemberImpl mbr; protected long lastHeardFrom; - public MbrEntry(McastMember mbr) { + public MbrEntry(MemberImpl mbr) { this.mbr = mbr; } @@ -279,7 +279,7 @@ /** * Return the actual McastMember object */ - public McastMember getMember() { + public MemberImpl getMember() { return mbr; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java?rev=386140&r1=386139&r2=386140&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java Wed Mar 15 11:16:58 2006 @@ -72,7 +72,7 @@ /** * The local member */ - protected McastMember localMember ; + protected MemberImpl localMember ; private int mcastSoTimeout; private int mcastTTL; @@ -246,7 +246,7 @@ int port = Integer.parseInt(getProperties().getProperty("tcpListenPort")); if ( localMember == null ) { - localMember = new McastMember(domain, host, port, 100); + localMember = new MemberImpl(domain, host, port, 100); } else { localMember.setDomain(domain); localMember.setHostname(host); @@ -275,7 +275,7 @@ } } - impl = new McastServiceImpl((McastMember)localMember,Long.parseLong(properties.getProperty("msgFrequency")), + impl = new McastServiceImpl((MemberImpl)localMember,Long.parseLong(properties.getProperty("msgFrequency")), Long.parseLong(properties.getProperty("memberDropTime")), Integer.parseInt(properties.getProperty("mcastPort")), bind, Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java?rev=386140&r1=386139&r2=386140&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java Wed Mar 15 11:16:58 2006 @@ -50,7 +50,7 @@ /** * The local member that we intend to broad cast over and over again */ - protected McastMember member; + protected MemberImpl member; /** * The multicast address */ @@ -113,7 +113,7 @@ * @throws IOException */ public McastServiceImpl( - McastMember member, + MemberImpl member, long sendFrequency, long expireTime, int port, @@ -208,7 +208,7 @@ socket.receive(receivePacket); byte[] data = new byte[receivePacket.getLength()]; System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length); - McastMember m = McastMember.getMember(data); + MemberImpl m = MemberImpl.getMember(data); if(log.isDebugEnabled()) log.debug("Mcast receive ping from member " + m); @@ -217,7 +217,7 @@ log.debug("Mcast add member " + m); service.memberAdded(m); } - McastMember[] expired = membership.expire(timeToExpiration); + MemberImpl[] expired = membership.expire(timeToExpiration); for ( int i=0; i<expired.length; i++) { if(log.isDebugEnabled()) log.debug("Mcast exipre member " + m); Copied: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java (from r385742, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java) URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java?p2=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java&p1=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java&r1=385742&r2=386140&rev=386140&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java Wed Mar 15 11:16:58 2006 @@ -33,7 +33,7 @@ * @author Peter Rossbach * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */ -public class McastMember implements Member, java.io.Externalizable { +public class MemberImpl implements Member, java.io.Externalizable { /** * Digits, used for "superfast" de-serialization of an @@ -87,7 +87,7 @@ /** * Empty constructor for serialization */ - public McastMember() { + public MemberImpl() { } @@ -98,7 +98,7 @@ * @param host - the tcp listen host * @param port - the tcp listen port */ - public McastMember(String domain, + public MemberImpl(String domain, String host, int port, long aliveTime) throws IOException { @@ -117,10 +117,10 @@ */ public java.util.HashMap getMemberProperties() { java.util.HashMap map = new java.util.HashMap(2); - map.put(McastMember.TCP_LISTEN_HOST,this.host); - map.put(McastMember.TCP_LISTEN_PORT,String.valueOf(this.port)); - map.put(McastMember.MEMBER_NAME,getName()); - map.put(McastMember.MEMBER_DOMAIN,domain); + map.put(MemberImpl.TCP_LISTEN_HOST,this.host); + map.put(MemberImpl.TCP_LISTEN_PORT,String.valueOf(this.port)); + map.put(MemberImpl.MEMBER_NAME,getName()); + map.put(MemberImpl.MEMBER_DOMAIN,domain); return map; } @@ -187,7 +187,7 @@ * @param data - the bytes received * @return a member object. */ - public static McastMember getMember(byte[] data, McastMember member) { + public static MemberImpl getMember(byte[] data, MemberImpl member) { //package looks like //alive - 8 bytes //port - 4 bytes @@ -214,8 +214,8 @@ return member; } - public static McastMember getMember(byte[] data) { - return getMember(data,new McastMember()); + public static MemberImpl getMember(byte[] data) { + return getMember(data,new MemberImpl()); } /** @@ -303,9 +303,9 @@ * @param o */ public boolean equals(Object o) { - if ( o instanceof McastMember ) { - return Arrays.equals(this.getHost(),((McastMember)o).getHost()) && - this.getPort() == ((McastMember)o).getPort(); + if ( o instanceof MemberImpl ) { + return Arrays.equals(this.getHost(),((MemberImpl)o).getHost()) && + this.getPort() == ((MemberImpl)o).getPort(); } else return false; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=386140&r1=386139&r2=386140&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Wed Mar 15 11:16:58 2006 @@ -39,8 +39,7 @@ import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.io.DirectByteArrayOutputStream; import org.apache.catalina.tribes.io.XByteBuffer; -import org.apache.catalina.tribes.mcast.McastMember; -import org.apache.catalina.tribes.tcp.*; +import org.apache.catalina.tribes.mcast.MemberImpl; /** * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. @@ -137,16 +136,19 @@ private void init(Channel channel, String mapContextName, long timeout) { final String chset = "ISO-8859-1"; + this.channel = channel; this.rpcTimeout = timeout; + try { + //unique context is more efficient if it is stored as bytes this.mapContextName = mapContextName.getBytes(chset); }catch (UnsupportedEncodingException x) { log.warn("Unable to encode mapContextName["+mapContextName+"] using getBytes("+chset+") using default getBytes()",x); this.mapContextName = mapContextName.getBytes(); } - //create an rpc channel + //create an rpc channel and add the map as a listener this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); this.channel.addChannelListener(this); this.channel.addMembershipListener(this); @@ -196,7 +198,18 @@ super.clear(); this.stateTransferred = false; } - + + +//------------------------------------------------------------------------------ +// GROUP COM INTERFACES +//------------------------------------------------------------------------------ + public Member[] getMapMembers() { + synchronized (mapMembers) { + Member[] result = new Member[mapMembers.size()]; + mapMembers.toArray(result); + return result; + } + } /** * Replicates any changes to the object since the last time * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br> @@ -258,12 +271,10 @@ } -//------------------------------------------------------------------------------ -// GROUP COM INTERFACES -//------------------------------------------------------------------------------ public void transferState() { try { - Member backup = mapMembers.size()>0?(Member)mapMembers.get(0):null; + Member[] members = getMapMembers(); + Member backup = members.length>0?(Member)members[0]:null; if ( backup != null ) { MapMessage msg = new MapMessage(mapContextName,MapMessage.MSG_STATE,false, null,null,null,null); @@ -423,7 +434,9 @@ public void mapMemberAdded(Member member) { //select a backup node if we don't have one - mapMembers.add(member); + synchronized (mapMembers) { + mapMembers.add(member); + } synchronized (stateMutex) { Iterator i = super.entrySet().iterator(); while (i.hasNext()) { @@ -446,7 +459,9 @@ } public void memberDisappeared(Member member) { - mapMembers.remove(member); + synchronized (mapMembers) { + mapMembers.remove(member); + } //todo move all sessions that are primary here to and have this member as //a backup Iterator i = super.entrySet().iterator(); @@ -466,13 +481,14 @@ int currentNode = 0; public Member getNextBackupNode() { - if ( mapMembers.size() == 0 ) return null; + Member[] members = getMapMembers(); + if ( members.length == 0 ) return null; int node = currentNode++; - if ( node >= mapMembers.size() ) { + if ( node >= members.length ) { node = 0; currentNode = 0; } - return (Member)mapMembers.get(node); + return members[node]; } @@ -496,7 +512,7 @@ //publish the data out to all nodes MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_PROXY, false, (Serializable) key, null, null, backup); - channel.send((Member[])mapMembers.toArray(new Member[mapMembers.size()]), msg); + channel.send(getMapMembers(), msg); //publish the backup data to one node msg = new MapMessage(this.mapContextName, MapMessage.MSG_BACKUP, false, @@ -509,6 +525,7 @@ MapEntry entry = (MapEntry)super.get(key); if ( entry == null ) return null; if ( !entry.isPrimary() ) { + //if the message is not primary, we need to retrieve the latest value try { MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key, null, null, null); @@ -516,7 +533,7 @@ msg, this.rpcChannel.FIRST_REPLY, rpcTimeout); if (resp == null || resp.length == 0) { //no responses - log.warn("Unable to retrieve object for key:" + key); + log.warn("Unable to retrieve remote object for key:" + key); return null; } msg = (MapMessage) resp[0].getMessage(); @@ -537,7 +554,6 @@ entry.setProxy(false); entry.setValue(msg.getValue()); - } catch (ChannelException x) { log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x); return null; @@ -546,10 +562,18 @@ return entry.getValue(); } + /** + * Returns true if the key has an entry in the map. + * The entry can be a proxy or a backup entry, invoking <code>get(key)</code> + * will make this entry primary for the group + * @param key Object + * @return boolean + */ public boolean containsKey(Object key) { return super.containsKey(key); } + public Object put(Object key, Object value) { if ( !(key instanceof Serializable) ) throw new IllegalArgumentException("Key is not serializable:"+key.getClass().getName()); if ( value == null ) return remove(key); @@ -574,7 +598,10 @@ } - + /** + * Copies all values from one map to this instance + * @param m Map + */ public void putAll(Map m) { Iterator i = m.entrySet().iterator(); while ( i.hasNext() ) { @@ -582,12 +609,18 @@ put(entry.getKey(),entry.getValue()); } } - + + /** + * Removes an object from this map, it will also remove it from + * + * @param key Object + * @return Object + */ public Object remove(Object key) { MapEntry entry = (MapEntry)super.remove(key); MapMessage msg = new MapMessage(mapContextName,MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null); try { - channel.send(channel.getMembers(), msg); + channel.send(getMapMembers(), msg); } catch ( ChannelException x ) { log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x); } @@ -891,7 +924,7 @@ }//endif byte[] d = new byte[in.readInt()]; in.read(d); - if ( d.length > 0 ) node = McastMember.getMember(d); + if ( d.length > 0 ) node = MemberImpl.getMember(d); break; } case MSG_RETRIEVE_BACKUP: { @@ -907,14 +940,14 @@ key = (Serializable)in.readObject(); byte[] d = new byte[in.readInt()]; in.read(d); - if ( d.length > 0 ) node = McastMember.getMember(d); + if ( d.length > 0 ) node = MemberImpl.getMember(d); break; } case MSG_START : MSG_STOP :{ byte[] d = new byte[in.readInt()]; in.read(d); - if ( d.length > 0 ) node = McastMember.getMember(d); + if ( d.length > 0 ) node = MemberImpl.getMember(d); break; } @@ -936,7 +969,7 @@ } else { out.writeObject(value); }//endif - byte[] d = node!=null?((McastMember)node).getData(false):new byte[0]; + byte[] d = node!=null?((MemberImpl)node).getData(false):new byte[0]; out.writeInt(d.length); out.write(d); break; @@ -952,14 +985,14 @@ } case MSG_PROXY: { out.writeObject(key); - byte[] d = node!=null?((McastMember)node).getData(false):new byte[0]; + byte[] d = node!=null?((MemberImpl)node).getData(false):new byte[0]; out.writeInt(d.length); out.write(d); break; } case MSG_START: MSG_STOP : { - byte[] d = node!=null?((McastMember)node).getData(false):new byte[0]; + byte[] d = node!=null?((MemberImpl)node).getData(false):new byte[0]; out.writeInt(d.length); out.write(d); break; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]