Author: fhanik Date: Mon Feb 18 10:50:35 2008 New Revision: 628843 URL: http://svn.apache.org/viewvc?rev=628843&view=rev Log: implement setter of secure and UDP ports
Modified: tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java tomcat/trunk/webapps/docs/config/cluster-receiver.xml Modified: tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java?rev=628843&r1=628842&r2=628843&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java Mon Feb 18 10:50:35 2008 @@ -59,6 +59,12 @@ public int getSecurePort(); /** + * Returns the UDP port + * @return port, -1 if the UDP port is not activated. + */ + public int getUdpPort(); + + /** * Sets the message listener to receive notification of incoming * @param listener MessageListener * @see MessageListener Modified: tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java?rev=628843&r1=628842&r2=628843&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java Mon Feb 18 10:50:35 2008 @@ -109,7 +109,7 @@ /** * Sets the local member properties for broadcasting */ - public void setLocalMemberProperties(String listenHost, int listenPort); + public void setLocalMemberProperties(String listenHost, int listenPort, int securePort, int udpPort); /** * Sets the membership listener, only one listener can be added. Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=628843&r1=628842&r2=628843&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java Mon Feb 18 10:50:35 2008 @@ -141,7 +141,10 @@ clusterReceiver.setMessageListener(this); clusterReceiver.start(); //synchronize, big time FIXME - membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort()); + membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), + getClusterReceiver().getPort(), + getClusterReceiver().getSecurePort(), + getClusterReceiver().getUdpPort()); valid = true; } if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) { Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java?rev=628843&r1=628842&r2=628843&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java Mon Feb 18 10:50:35 2008 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -70,9 +70,9 @@ protected MemberImpl localMember ; private int mcastSoTimeout; private int mcastTTL; - + protected byte[] payload; - + protected byte[] domain; /** @@ -95,7 +95,7 @@ public String getInfo() { return (info); } - + /** * * @param properties @@ -132,7 +132,7 @@ public String getLocalMemberName() { return localMember.toString() ; } - + /** * Return the local member */ @@ -140,13 +140,15 @@ if ( alive && localMember != null && impl != null) localMember.setMemberAliveTime(System.currentTimeMillis()-impl.getServiceStartTime()); return localMember; } - + /** * Sets the local member properties for broadcasting */ - public void setLocalMemberProperties(String listenHost, int listenPort) { + public void setLocalMemberProperties(String listenHost, int listenPort, int securePort, int udpPort) { properties.setProperty("tcpListenHost",listenHost); properties.setProperty("tcpListenPort",String.valueOf(listenPort)); + properties.setProperty("udpListenPort",String.valueOf(udpPort)); + properties.setProperty("tcpSecurePort",String.valueOf(securePort)); try { if (localMember != null) { localMember.setHostname(listenHost); @@ -157,16 +159,18 @@ localMember.setPayload(getPayload()); localMember.setDomain(getDomain()); } + localMember.setSecurePort(securePort); + localMember.setUdpPort(udpPort); localMember.getData(true, true); }catch ( IOException x ) { throw new IllegalArgumentException(x); } } - + public void setAddress(String addr) { properties.setProperty("mcastAddress", addr); } - + /** * @deprecated use setAddress * @param addr String @@ -174,11 +178,11 @@ public void setMcastAddr(String addr) { setAddress(addr); } - + public String getAddress() { return properties.getProperty("mcastAddress"); } - + /** * @deprecated use getAddress * @return String @@ -190,7 +194,7 @@ public void setMcastBindAddress(String bindaddr) { setBind(bindaddr); } - + public void setBind(String bindaddr) { properties.setProperty("mcastBindAddress", bindaddr); } @@ -217,7 +221,7 @@ public void setPort(int port) { properties.setProperty("mcastPort", String.valueOf(port)); } - + public void setRecoveryCounter(int recoveryCounter) { properties.setProperty("recoveryCounter", String.valueOf(recoveryCounter)); } @@ -242,7 +246,7 @@ String p = properties.getProperty("mcastPort"); return new Integer(p).intValue(); } - + /** * @deprecated use setFrequency * @param time long @@ -250,7 +254,7 @@ public void setMcastFrequency(long time) { setFrequency(time); } - + public void setFrequency(long time) { properties.setProperty("mcastFrequency", String.valueOf(time)); } @@ -274,7 +278,7 @@ public void setDropTime(long time) { properties.setProperty("memberDropTime", String.valueOf(time)); } - + /** * @deprecated use getDropTime * @return long @@ -305,7 +309,7 @@ start(MembershipService.MBR_RX); start(MembershipService.MBR_TX); } - + public void start(int level) throws java.lang.Exception { hasProperty(properties,"mcastPort"); hasProperty(properties,"mcastAddress"); @@ -313,6 +317,9 @@ hasProperty(properties,"mcastFrequency"); hasProperty(properties,"tcpListenPort"); hasProperty(properties,"tcpListenHost"); + hasProperty(properties,"tcpSecurePort"); + hasProperty(properties,"udpListenPort"); + if ( impl != null ) { impl.start(level); @@ -320,7 +327,9 @@ } String host = getProperties().getProperty("tcpListenHost"); int port = Integer.parseInt(getProperties().getProperty("tcpListenPort")); - + int securePort = Integer.parseInt(getProperties().getProperty("tcpSecurePort")); + int udpPort = Integer.parseInt(getProperties().getProperty("udpListenPort")); + if ( localMember == null ) { localMember = new MemberImpl(host, port, 100); localMember.setUniqueId(UUIDGenerator.randomUUID(true)); @@ -329,6 +338,8 @@ localMember.setPort(port); localMember.setMemberAliveTime(100); } + localMember.setSecurePort(securePort); + localMember.setUdpPort(udpPort); if ( this.payload != null ) localMember.setPayload(payload); if ( this.domain != null ) localMember.setDomain(domain); localMember.setServiceStartTime(System.currentTimeMillis()); @@ -363,19 +374,19 @@ this); String value = properties.getProperty("recoveryEnabled","true"); boolean recEnabled = Boolean.valueOf(value).booleanValue() ; - impl.setRecoveryEnabled(recEnabled); + impl.setRecoveryEnabled(recEnabled); int recCnt = Integer.parseInt(properties.getProperty("recoveryCounter","10")); impl.setRecoveryCounter(recCnt); long recSlpTime = Long.parseLong(properties.getProperty("recoverySleepTime","5000")); impl.setRecoverySleepTime(recSlpTime); - - + + impl.start(level); - + } - + /** * Stop broadcasting and listening to membership pings */ @@ -403,7 +414,7 @@ membernames = new String[0] ; return membernames ; } - + /** * Return the member by name */ @@ -423,7 +434,7 @@ if ( impl == null || impl.membership == null ) return false; return impl.membership.hasMembers(); } - + public Member getMember(Member mbr) { if ( impl == null || impl.membership == null ) return null; return impl.membership.getMember(mbr); @@ -472,11 +483,11 @@ public int getMcastSoTimeout() { return getSoTimeout(); } - + public int getSoTimeout() { return mcastSoTimeout; } - + /** * @deprecated use setSoTimeout * @param mcastSoTimeout int @@ -484,12 +495,12 @@ public void setMcastSoTimeout(int mcastSoTimeout) { setSoTimeout(mcastSoTimeout); } - + public void setSoTimeout(int mcastSoTimeout) { this.mcastSoTimeout = mcastSoTimeout; properties.setProperty("mcastSoTimeout", String.valueOf(mcastSoTimeout)); } - + /** * @deprecated use getTtl * @return int @@ -497,7 +508,7 @@ public int getMcastTTL() { return getTtl(); } - + public int getTtl() { return mcastTTL; } @@ -505,11 +516,11 @@ public byte[] getPayload() { return payload; } - + public byte[] getDomain() { return domain; } - + /** * @deprecated use setTtl * @param mcastTTL int @@ -535,7 +546,7 @@ } } } - + public void setDomain(byte[] domain) { this.domain = domain; if ( localMember != null ) { @@ -555,7 +566,7 @@ * @throws Exception If an error occurs */ public static void main(String args[]) throws Exception { - if(log.isInfoEnabled()) + if(log.isInfoEnabled()) log.info("Usage McastService hostname tcpport"); McastService service = new McastService(); java.util.Properties p = new java.util.Properties(); Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java?rev=628843&r1=628842&r2=628843&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java Mon Feb 18 10:50:35 2008 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -47,10 +47,10 @@ public static final transient String TCP_LISTEN_PORT = "tcpListenPort"; public static final transient String TCP_LISTEN_HOST = "tcpListenHost"; public static final transient String MEMBER_NAME = "memberName"; - + public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66, 1, 0}; public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69, 1, 0}; - + /** * The listen host for this member */ @@ -64,7 +64,7 @@ * The udp listen port for this member */ protected int udpPort = -1; - + /** * The tcp/SSL listen port for this member */ @@ -79,12 +79,12 @@ * created, is kept track of using the start time */ protected long memberAliveTime = 0; - + /** * For the local member only */ protected transient long serviceStartTime; - + /** * To avoid serialization over and over again, once the local dataPkg * has been set, we use that to transmit data @@ -95,13 +95,13 @@ * Unique session Id for this member */ protected byte[] uniqueId = new byte[16]; - + /** * Custom payload that an app framework can broadcast * Also used to transport stop command. */ protected byte[] payload = new byte[0]; - + /** * Command, so that the custom payload doesn't have to be used * This is for internal tribes use, such as SHUTDOWN_COMMAND @@ -112,12 +112,12 @@ * Domain if we want to filter based on domain. */ protected byte[] domain = new byte[0]; - + /** * Empty constructor for serialization */ public MemberImpl() { - + } /** @@ -134,7 +134,7 @@ this.port = port; this.memberAliveTime=aliveTime; } - + public MemberImpl(String host, int port, long aliveTime, @@ -142,7 +142,7 @@ this(host,port,aliveTime); setPayload(payload); } - + public boolean isReady() { return SenderState.getSenderState(this).isReady(); } @@ -178,8 +178,8 @@ public byte[] getData(boolean getalive) { return getData(getalive,false); } - - + + public int getDataLength() { return TRIBES_MBR_BEGIN.length+ //start pkg 4+ //data length @@ -198,9 +198,9 @@ payload.length+ //payload TRIBES_MBR_END.length; //end pkg } - + /** - * + * * @param getalive boolean - calculate memberAlive time * @param reset boolean - reset the cached data package, and create a new one * @return byte[] @@ -217,7 +217,7 @@ } return dataPkg; } - + //package looks like //start package TRIBES_MBR_BEGIN.length //package length - 4 bytes @@ -239,19 +239,19 @@ long alive=System.currentTimeMillis()-getServiceStartTime(); byte hl = (byte)addr.length; byte[] data = new byte[getDataLength()]; - + int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4); - + int pos = 0; - + //TRIBES_MBR_BEGIN System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length); pos += TRIBES_MBR_BEGIN.length; - + //body length XByteBuffer.toBytes(bodylength,data,pos); pos += 4; - + //alive data XByteBuffer.toBytes((long)alive,data,pos); pos += 8; @@ -289,7 +289,7 @@ pos+=4; System.arraycopy(payload,0,data,pos,payload.length); pos+=payload.length; - + //TRIBES_MBR_END System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length); pos += TRIBES_MBR_END.length; @@ -314,7 +314,7 @@ //alive - 8 bytes //port - 4 bytes //secure port - 4 bytes - //udp port - 4 bytes + //udp port - 4 bytes //host length - 1 byte //host - hl bytes //clen - 4 bytes @@ -327,7 +327,7 @@ //end package TRIBES_MBR_END.length int pos = offset; - + if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) { throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN)); } @@ -335,16 +335,16 @@ if ( length < (TRIBES_MBR_BEGIN.length+4) ) { throw new ArrayIndexOutOfBoundsException("Member package to small to validate."); } - + pos += TRIBES_MBR_BEGIN.length; - + int bodylength = XByteBuffer.toInt(data,pos); pos += 4; - + if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) { throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package."); } - + int endpos = pos+bodylength; if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) { throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END)); @@ -357,7 +357,7 @@ byte[] portd = new byte[4]; System.arraycopy(data, pos, portd, 0, 4); pos += 4; - + byte[] sportd = new byte[4]; System.arraycopy(data, pos, sportd, 0, 4); pos += 4; @@ -366,37 +366,37 @@ System.arraycopy(data, pos, uportd, 0, 4); pos += 4; - + byte hl = data[pos++]; byte[] addr = new byte[hl]; System.arraycopy(data, pos, addr, 0, hl); pos += hl; - + int cl = XByteBuffer.toInt(data, pos); pos += 4; - + byte[] command = new byte[cl]; System.arraycopy(data, pos, command, 0, command.length); pos += command.length; - + int dl = XByteBuffer.toInt(data, pos); pos += 4; - + byte[] domain = new byte[dl]; System.arraycopy(data, pos, domain, 0, domain.length); pos += domain.length; - + byte[] uniqueId = new byte[16]; System.arraycopy(data, pos, uniqueId, 0, 16); pos += 16; - + int pl = XByteBuffer.toInt(data, pos); pos += 4; - + byte[] payload = new byte[pl]; System.arraycopy(data, pos, payload, 0, payload.length); pos += payload.length; - + member.setHost(addr); member.setPort(XByteBuffer.toInt(portd, 0)); member.setSecurePort(XByteBuffer.toInt(sportd, 0)); @@ -406,10 +406,10 @@ member.payload = payload; member.domain = domain; member.command = command; - + member.dataPkg = new byte[length]; System.arraycopy(data, offset, member.dataPkg, 0, length); - + return member; } @@ -428,7 +428,7 @@ public String getName() { return "tcp://"+getHostname()+":"+getPort(); } - + /** * Return the listen port of this member * @return - tcp listen port @@ -444,14 +444,14 @@ public byte[] getHost() { return host; } - + public String getHostname() { if ( this.hostname != null ) return hostname; else { try { if (DO_DNS_LOOKUPS) this.hostname = java.net.InetAddress.getByAddress(host).getHostName(); - else + else this.hostname = org.apache.catalina.tribes.util.Arrays.toString(host); return this.hostname; }catch ( IOException x ) { @@ -493,9 +493,9 @@ public int getSecurePort() { return securePort; } - + public int getUdpPort() { - return udpPort; + return udpPort; } public void setMemberAliveTime(long time) { @@ -512,7 +512,9 @@ buf.append(getName()).append(","); buf.append(getHostname()).append(","); buf.append(port).append(", alive="); - buf.append(memberAliveTime).append(","); + buf.append(memberAliveTime).append(", "); + buf.append("securePort=").append(securePort).append(", "); + buf.append("UDP Port=").append(udpPort).append(", "); buf.append("id=").append(bToS(this.uniqueId)).append(", "); buf.append("payload=").append(bToS(this.payload,8)).append(", "); buf.append("command=").append(bToS(this.command,8)).append(", "); @@ -558,16 +560,16 @@ else return false; } - + public void setHost(byte[] host) { this.host = host; } - + public void setHostname(String host) throws IOException { hostname = host; this.host = java.net.InetAddress.getByName(host).getAddress(); } - + public void setMsgCount(int msgCount) { this.msgCount = msgCount; } @@ -593,7 +595,7 @@ this.payload = oldpayload; throw new IllegalArgumentException("Payload is to large for tribes to handle."); } - + } public void setCommand(byte[] command) { @@ -610,10 +612,10 @@ this.securePort = securePort; this.dataPkg = null; } - + public void setUdpPort(int port) { - this.udpPort = port; - this.dataPkg = null; + this.udpPort = port; + this.dataPkg = null; } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { @@ -621,7 +623,7 @@ byte[] message = new byte[length]; in.read(message); getMember(message,this); - + } public void writeExternal(ObjectOutput out) throws IOException { @@ -629,5 +631,5 @@ out.writeInt(data.length); out.write(data); } - + } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=628843&r1=628842&r2=628843&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Mon Feb 18 10:50:35 2008 @@ -47,11 +47,12 @@ protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReceiverBase.class); - + private MessageListener listener; private String host = "auto"; private InetAddress bind; private int port = 4000; + private int udpPort = -1; private int securePort = -1; private int rxBufSize = 43800; private int txBufSize = 25188; @@ -74,24 +75,24 @@ private int soTrafficClass = 0x04 | 0x08 | 0x010; private int timeout = 3000; //3 seconds private boolean useBufferPool = true; - + private ExecutorService executor; public ReceiverBase() { } - + public void start() throws IOException { if ( executor == null ) { executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); } } - + public void stop() { if ( executor != null ) executor.shutdownNow();//ignore left overs executor = null; } - + /** * getMessageListener * @@ -118,7 +119,7 @@ public int getTxBufSize() { return txBufSize; } - + /** * @deprecated use getMinThreads()/getMaxThreads() * @return int @@ -188,7 +189,7 @@ } return bind; } - + /** * recursive bind to find the next available port * @param socket ServerSocket @@ -219,13 +220,13 @@ } return retries; } - + public void messageDataReceived(ChannelMessage data) { if ( this.listener != null ) { if ( listener.accept(data) ) listener.messageReceived(data); } } - + public int getWorkerThreadOptions() { int options = 0; if ( getDirect() ) options = options | OPTION_DIRECT_BUFFER; @@ -248,7 +249,7 @@ return getPort(); } - + public boolean getDirect() { return direct; } @@ -264,7 +265,7 @@ getBind(); return this.host; } - + public String getHost() { return getAddress(); } @@ -291,7 +292,7 @@ public RxTaskPool getTaskPool() { return pool; } - + /** * @deprecated use getAddress * @return String @@ -376,7 +377,7 @@ public void setTcpSelectorTimeout(long selTimeout) { setSelectorTimeout(selTimeout); } - + public void setSelectorTimeout(long selTimeout) { tcpSelectorTimeout = selTimeout; } @@ -385,7 +386,7 @@ this.listen = doListen; } - + public void setAddress(String host) { this.host = host; } @@ -478,5 +479,13 @@ public void heartbeat() { //empty operation } - + + public int getUdpPort() { + return udpPort; + } + + public void setUdpPort(int udpPort) { + this.udpPort = udpPort; + } + } Modified: tomcat/trunk/webapps/docs/config/cluster-receiver.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/config/cluster-receiver.xml?rev=628843&r1=628842&r2=628843&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/config/cluster-receiver.xml (original) +++ tomcat/trunk/webapps/docs/config/cluster-receiver.xml Mon Feb 18 10:50:35 2008 @@ -90,6 +90,10 @@ The secure listen port. This port is SSL enabled. If this attribute is omitted no SSL port is opened up. There default value is unset, meaning there is no SSL socket available. </attribute> + <attribute name="udpPort" required="false"> + The UDP listen port. If this attribute is omitted no UDP port is opened up. + There default value is unset, meaning there is no UDP listener available. + </attribute> <attribute name="selectorTimeout" required="false"> The value in milliseconds for the polling timeout in the <code>NioReceiver</code>. On older versions of the JDK there have been bugs, that should all now be cleared out where the selector never woke up. --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]