Author: fhanik Date: Fri Jul 7 16:53:44 2006 New Revision: 420015 URL: http://svn.apache.org/viewvc?rev=420015&view=rev Log: Fix in trace, and added a protocol around multicast packets as well
Modified: tomcat/container/tc5.5.x/modules/groupcom/VERSION tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Modified: tomcat/container/tc5.5.x/modules/groupcom/VERSION URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/VERSION?rev=420015&r1=420014&r2=420015&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/VERSION (original) +++ tomcat/container/tc5.5.x/modules/groupcom/VERSION Fri Jul 7 16:53:44 2006 @@ -1,3 +1,6 @@ +0.9.5.1 + - create a protocol around the multicast packages, so that we can go to nio eventually + - corrected tracing 0.9.5.0 - added message tracers 0.9.4.9 Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=420015&r1=420014&r2=420015&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Fri Jul 7 16:53:44 2006 @@ -257,10 +257,12 @@ //get the actual member with the correct alive time Member source = msg.getAddress(); boolean rx = false; + boolean delivered = false; for ( int i=0; i<channelListeners.size(); i++ ) { ChannelListener channelListener = (ChannelListener)channelListeners.get(i); if (channelListener != null && channelListener.accept(fwd, source)) { channelListener.messageReceived(fwd, source); + delivered = true; //if the message was accepted by an RPC channel, that channel //is responsible for returning the reply, otherwise we send an absence reply if ( channelListener instanceof RpcChannel ) rx = true; @@ -272,7 +274,7 @@ sendNoRpcChannelReply((RpcMessage)fwd,source); } if ( Logs.MESSAGES.isTraceEnabled() ) { - Logs.MESSAGES.trace("GroupChannel delivered["+rx+"] id:"+new UniqueId(msg.getUniqueId())); + Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId())); } } catch ( Exception x ) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java?rev=420015&r1=420014&r2=420015&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java Fri Jul 7 16:53:44 2006 @@ -17,6 +17,8 @@ package org.apache.catalina.tribes.membership; +import org.apache.catalina.tribes.util.Arrays; + /** * Manifest constants for the <code>org.apache.catalina.tribes.membership</code> @@ -24,10 +26,14 @@ * * @author Peter Rossbach * @version $Revision: 303950 $ $Date: 2005-06-09 15:38:30 -0500 (Thu, 09 Jun 2005) $ + * @author Filip Hanik */ public class Constants { public static final String Package = "org.apache.catalina.tribes.membership"; - + public static void main(String[] args) throws Exception { + System.out.println(Arrays.toString("TRIBES-B".getBytes())); + System.out.println(Arrays.toString("TRIBES-E".getBytes())); + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=420015&r1=420014&r2=420015&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Fri Jul 7 16:53:44 2006 @@ -338,8 +338,8 @@ //ignore if we haven't started the sender //if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return; member.inc(); - if(log.isDebugEnabled()) - log.debug("Mcast send ping from member " + member); + if(log.isTraceEnabled()) + log.trace("Mcast send ping from member " + member); byte[] data = member.getData(); DatagramPacket p = new DatagramPacket(data,data.length); p.setAddress(address); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=420015&r1=420014&r2=420015&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Fri Jul 7 16:53:44 2006 @@ -42,6 +42,9 @@ public static final transient String TCP_LISTEN_HOST = "tcpListenHost"; public static final transient String MEMBER_NAME = "memberName"; + public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66}; + public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69}; + /** * The listen host for this member */ @@ -163,7 +166,20 @@ public int getDataLength() { - return 8+4+1+host.length+4+command.length+4+domain.length+16+4+payload.length; + return TRIBES_MBR_BEGIN.length+ //start pkg + 4+ //data length + 8+ //alive time + 4+ //port + 1+ //host length + host.length+ //host + 4+ //command length + command.length+ //command + 4+ //domain length + domain.length+ //domain + 16+ //unique id + 4+ //payload length + payload.length+ //payload + TRIBES_MBR_END.length; //end pkg } /** @@ -180,12 +196,14 @@ //you'd be surprised, but System.currentTimeMillis //shows up on the profiler long alive=System.currentTimeMillis()-getServiceStartTime(); - XByteBuffer.toBytes( (long) alive, dataPkg, 0); + XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4); } return dataPkg; } //package looks like + //start package TRIBES_MBR_BEGIN.length + //package length - 4 bytes //alive - 8 bytes //port - 4 bytes //host length - 1 byte @@ -197,13 +215,26 @@ //uniqueId - 16 bytes //payload length - 4 bytes //payload plen bytes + //end package TRIBES_MBR_END.length byte[] addr = host; long alive=System.currentTimeMillis()-getServiceStartTime(); byte hl = (byte)addr.length; byte[] data = new byte[getDataLength()]; + + int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4); + int pos = 0; + + //TRIBES_MBR_BEGIN + System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length); + pos += TRIBES_MBR_BEGIN.length; + + //body length + XByteBuffer.toBytes(bodylength,data,pos); + pos += 4; + //alive data - XByteBuffer.toBytes((long)alive,data,0); + XByteBuffer.toBytes((long)alive,data,pos); pos += 8; //port XByteBuffer.toBytes(port,data,pos); @@ -233,6 +264,10 @@ pos+=4; System.arraycopy(payload,0,data,pos,payload.length); pos+=payload.length; + + //TRIBES_MBR_END + System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length); + pos += TRIBES_MBR_END.length; //create local data dataPkg = data; @@ -248,69 +283,96 @@ } public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) { - //package looks like - //alive - 8 bytes - //port - 4 bytes - //host length - 1 byte - //host - hl bytes - //command length - 4 bytes - //command clen bytes - //domain length - 4 bytes - //domain - dlen bytes - //uniqueId - 16 bytes - //payload length - 4bytes - //payload - pl bytes - int pos = offset; - - byte[] alived = new byte[8]; - System.arraycopy(data, pos, alived, 0, 8); - pos+=8; - byte[] portd = new byte[4]; - System.arraycopy(data, pos, portd, 0, 4); - pos+=4; - - byte hl = data[pos++]; - byte[] addr = new byte[hl]; - System.arraycopy(data, pos, addr, 0, hl); - pos+=hl; - - int cl = XByteBuffer.toInt(data,pos); - pos+=4; - - byte[] command = new byte[cl]; - System.arraycopy(data, pos, command, 0, command.length); - pos+=command.length; - - int dl = XByteBuffer.toInt(data,pos); - pos+=4; - - byte[] domain = new byte[dl]; - System.arraycopy(data, pos, domain, 0, domain.length); - pos+=domain.length; - - byte[] uniqueId = new byte[16]; - System.arraycopy(data, pos, uniqueId, 0, 16); - pos+=16; - - int pl = XByteBuffer.toInt(data,pos); - pos+=4; - - byte[] payload = new byte[pl]; - System.arraycopy(data, pos, payload, 0, payload.length); - pos+=payload.length; - - member.setHost(addr); - member.setPort(XByteBuffer.toInt(portd, 0)); - member.setMemberAliveTime(XByteBuffer.toLong(alived, 0)); - member.setUniqueId(uniqueId); - member.payload = payload; - member.domain = domain; - member.command = command; - - member.dataPkg = new byte[length]; - System.arraycopy(data,offset,member.dataPkg,0,length); - - return member; + //package looks like + //start package TRIBES_MBR_BEGIN.length + //package length - 4 bytes + //alive - 8 bytes + //port - 4 bytes + //host length - 1 byte + //host - hl bytes + //clen - 4 bytes + //command - clen bytes + //dlen - 4 bytes + //domain - dlen bytes + //uniqueId - 16 bytes + //payload length - 4 bytes + //payload plen bytes + //end package TRIBES_MBR_END.length + + int pos = offset; + + if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) { + throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN)); + } + + if ( length < (TRIBES_MBR_BEGIN.length+4) ) { + throw new ArrayIndexOutOfBoundsException("Member package to small to validate."); + } + + pos += TRIBES_MBR_BEGIN.length; + + int bodylength = XByteBuffer.toInt(data,pos); + pos += 4; + + if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) { + throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package."); + } + + int endpos = pos+bodylength; + if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) { + throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END)); + } + + + byte[] alived = new byte[8]; + System.arraycopy(data, pos, alived, 0, 8); + pos += 8; + byte[] portd = new byte[4]; + System.arraycopy(data, pos, portd, 0, 4); + pos += 4; + + byte hl = data[pos++]; + byte[] addr = new byte[hl]; + System.arraycopy(data, pos, addr, 0, hl); + pos += hl; + + int cl = XByteBuffer.toInt(data, pos); + pos += 4; + + byte[] command = new byte[cl]; + System.arraycopy(data, pos, command, 0, command.length); + pos += command.length; + + int dl = XByteBuffer.toInt(data, pos); + pos += 4; + + byte[] domain = new byte[dl]; + System.arraycopy(data, pos, domain, 0, domain.length); + pos += domain.length; + + byte[] uniqueId = new byte[16]; + System.arraycopy(data, pos, uniqueId, 0, 16); + pos += 16; + + int pl = XByteBuffer.toInt(data, pos); + pos += 4; + + byte[] payload = new byte[pl]; + System.arraycopy(data, pos, payload, 0, payload.length); + pos += payload.length; + + member.setHost(addr); + member.setPort(XByteBuffer.toInt(portd, 0)); + member.setMemberAliveTime(XByteBuffer.toLong(alived, 0)); + member.setUniqueId(uniqueId); + member.payload = payload; + member.domain = domain; + member.command = command; + + member.dataPkg = new byte[length]; + System.arraycopy(data, offset, member.dataPkg, 0, length); + + return member; } public static MemberImpl getMember(byte[] data) { @@ -414,7 +476,7 @@ for (int i=0; data!=null && i<data.length; i++ ) { buf.append(String.valueOf(data[i])).append(" "); if ( i==max ) { - buf.append("..."); + buf.append("...("+data.length+")"); break; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]