Author: kfujino Date: Thu Jan 14 07:50:02 2016 New Revision: 1724540 URL: http://svn.apache.org/viewvc?rev=1724540&view=rev Log: Remove dependency on TcpFailureDetector from NonBlockingCoordinator.
Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=1724540&r1=1724539&r2=1724540&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Thu Jan 14 07:50:02 2016 @@ -16,6 +16,11 @@ */ package org.apache.catalina.tribes.group.interceptors; +import java.net.ConnectException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.catalina.tribes.Channel; @@ -287,13 +292,47 @@ public class NonBlockingCoordinator exte } protected boolean alive(Member mbr) { - return TcpFailureDetector.memberAlive(mbr, - COORD_ALIVE, - false, - false, - waitForCoordMsgTimeout, - waitForCoordMsgTimeout, - getOptionFlag()); + return memberAlive(mbr, COORD_ALIVE, false, false, + waitForCoordMsgTimeout, waitForCoordMsgTimeout, getOptionFlag()); + } + + protected static boolean memberAlive(Member mbr, byte[] msgData, + boolean sendTest, boolean readTest, + long readTimeout, long conTimeout, + int optionFlag) { + //could be a shutdown notification + if ( Arrays.equals(mbr.getCommand(),Member.SHUTDOWN_PAYLOAD) ) return false; + + try (Socket socket = new Socket()) { + InetAddress ia = InetAddress.getByAddress(mbr.getHost()); + InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort()); + socket.setSoTimeout((int)readTimeout); + socket.connect(addr, (int) conTimeout); + if ( sendTest ) { + ChannelData data = new ChannelData(true); + data.setAddress(mbr); + data.setMessage(new XByteBuffer(msgData,false)); + data.setTimestamp(System.currentTimeMillis()); + int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE; + if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK); + else options = (options & (~Channel.SEND_OPTIONS_USE_ACK)); + data.setOptions(options); + byte[] message = XByteBuffer.createDataPackage(data); + socket.getOutputStream().write(message); + if ( readTest ) { + int length = socket.getInputStream().read(message); + return length > 0; + } + }//end if + return true; + } catch (SocketTimeoutException sx) { + //do nothing, we couldn't connect + } catch (ConnectException cx) { + //do nothing, we couldn't connect + } catch (Exception x) { + log.error(sm.getString("tcpFailureDetector.failureDetection.failed"),x); + } + return false; } protected Membership mergeOnArrive(CoordinationMessage msg) { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org