Author: fhanik
Date: Mon Feb 27 10:22:16 2006
New Revision: 381403
URL: http://svn.apache.org/viewcvs?rev=381403&view=rev
Log:
Initial cleanup, getting ready to create a NIO data sender for faster
throughput.
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381403&r1=381402&r2=381403&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
Mon Feb 27 10:22:16 2006
@@ -34,25 +34,23 @@
* @author Peter Rossbach
* @author Filip Hanik
* @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb
2006) $
- * @since 5.5.7
+ * @since 5.5.16
*/
public class DataSender implements IDataSender {
- private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory
- .getLog(DataSender.class);
+ private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(DataSender.class);
/**
* The string manager for this package.
*/
- protected static StringManager sm = StringManager
- .getManager(Constants.Package);
+ protected static StringManager sm =
StringManager.getManager(Constants.Package);
// ----------------------------------------------------- Instance Variables
/**
* The descriptive information about this implementation.
*/
- private static final String info = "DataSender/2.1";
+ private static final String info = "DataSender/3.0";
/**
* receiver address
@@ -678,20 +676,17 @@
return ;
try {
createSocket();
- if (isWaitForAck())
- socket.setSoTimeout((int) ackTimeout);
+ if (isWaitForAck()) socket.setSoTimeout((int) ackTimeout);
isSocketConnected = true;
socketOpenCounter++;
this.keepAliveCount = 0;
this.keepAliveConnectTime = System.currentTimeMillis();
if (log.isDebugEnabled())
- log.debug(sm.getString("IDataSender.openSocket", address
- .getHostAddress(), new Integer(port),new
Long(socketOpenCounter)));
+ log.debug(sm.getString("IDataSender.openSocket",
address.getHostAddress(), new Integer(port),new Long(socketOpenCounter)));
} catch (IOException ex1) {
socketOpenFailureCounter++ ;
if (log.isDebugEnabled())
- log.debug(sm.getString("IDataSender.openSocket.failure",
- address.getHostAddress(), new Integer(port),new
Long(socketOpenFailureCounter)), ex1);
+
log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(),
new Integer(port),new Long(socketOpenFailureCounter)), ex1);
throw ex1;
}
@@ -725,8 +720,7 @@
isSocketConnected = false;
socketCloseCounter++;
if (log.isDebugEnabled())
- log.debug(sm.getString("IDataSender.closeSocket",
- address.getHostAddress(), new Integer(port),new
Long(socketCloseCounter)));
+
log.debug(sm.getString("IDataSender.closeSocket",address.getHostAddress(), new
Integer(port),new Long(socketCloseCounter)));
}
}
@@ -791,62 +785,48 @@
* @throws java.io.IOException
* @since 5.5.10
*/
- protected void pushMessage( ChannelMessage data)
- throws java.io.IOException {
- long time = 0 ;
- if(doProcessingStats) {
- time = System.currentTimeMillis();
- }
- boolean messageTransfered = false ;
+
+ protected void pushMessage(ChannelMessage data, boolean reconnect) throws
java.io.IOException {
synchronized(this) {
checkKeepAlive();
- if (!isConnected())
- openSocket();
- else if(keepAliveTimeout > -1)
- this.keepAliveConnectTime = System.currentTimeMillis();
+ if ( reconnect ) closeSocket();
+ if (!isConnected()) openSocket();
+ else if(keepAliveTimeout > -1) this.keepAliveConnectTime =
System.currentTimeMillis();
}
+ writeData(data);
+
+ }
+
+ protected void pushMessage( ChannelMessage data) throws
java.io.IOException {
+ long time = 0 ;
+ if(doProcessingStats) time = System.currentTimeMillis();
+ boolean messageTransfered = false ;
IOException exception = null;
try {
- writeData(data);
+ // first try with existing connection
+ pushMessage(data,false);
messageTransfered = true ;
} catch (java.io.IOException x) {
exception = x;
- if( true ) { //allow resend
+ //resend
+ dataResendCounter++;
+ if (log.isTraceEnabled())
log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new
Integer(port)),x);
+ try {
// second try with fresh connection
- dataResendCounter++;
- if (log.isTraceEnabled())
- log.trace(sm.getString("IDataSender.send.again",
address.getHostAddress(),
- new Integer(port)),x);
- synchronized(this) {
- closeSocket();
- openSocket();
- }
- try {
- writeData(data);
- messageTransfered = true;
- exception = null;
- } catch (IOException xx) {
- xx.fillInStackTrace();
- exception = xx;
- }
- } else
- {
- synchronized(this) {
- closeSocket();
- }
+ pushMessage(data,true);
+ messageTransfered = true;
+ exception = null;
+ } catch (IOException xx) {
+ exception = xx;
+ closeSocket();
}
} finally {
this.keepAliveCount++;
checkKeepAlive();
- if(doProcessingStats) {
- addProcessingStats(time);
- }
+ if(doProcessingStats) addProcessingStats(time);
if(messageTransfered) {
addStats(data.getMessage().length);
- if (log.isTraceEnabled()) {
- log.trace(sm.getString("IDataSender.send.message",
address.getHostAddress(),
- new Integer(port), data.getUniqueId(), new
Long(data.getMessage().length)));
- }
+ if (log.isTraceEnabled())
log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new
Integer(port), data.getUniqueId(), new Long(data.getMessage().length)));
} else {
dataFailureCounter++;
if ( exception != null ) throw exception;
@@ -869,8 +849,7 @@
OutputStream out = socket.getOutputStream();
out.write(XByteBuffer.createDataPackage((ClusterData)data));
out.flush();
- if (isWaitForAck())
- waitForAck(ackTimeout);
+ if (isWaitForAck()) waitForAck(ackTimeout);
} finally {
synchronized(this) {
isMessageTransferStarted = false ;
@@ -892,31 +871,20 @@
}
try {
int bytesRead = 0;
- if ( log.isTraceEnabled() )
- log.trace(sm.getString("IDataSender.ack.start",getAddress(),
new Integer(socket.getLocalPort())));
+ if ( log.isTraceEnabled() )
log.trace(sm.getString("IDataSender.ack.start",getAddress(), new
Integer(socket.getLocalPort())));
int i = socket.getInputStream().read();
while ((i != -1) && (i != 3) && bytesRead < 10) {
- if ( log.isTraceEnabled() )
-
log.trace(sm.getString("IDataSender.ack.read",getAddress(), new
Integer(socket.getLocalPort()),new Character((char) i)));
+ if ( log.isTraceEnabled() )
log.trace(sm.getString("IDataSender.ack.read",getAddress(), new
Integer(socket.getLocalPort()),new Character((char) i)));
bytesRead++;
i = socket.getInputStream().read();
}
if (i != 3) {
- if (i == -1) {
- throw new
IOException(sm.getString("IDataSender.ack.eof",getAddress(), new
Integer(socket.getLocalPort())));
- } else {
- throw new
IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new
Integer(socket.getLocalPort())));
- }
- } else {
- if (log.isTraceEnabled()) {
- log.trace(sm.getString("IDataSender.ack.receive",
getAddress(),new Integer(socket.getLocalPort())));
- }
- }
+ if (i == -1) throw new
IOException(sm.getString("IDataSender.ack.eof",getAddress(), new
Integer(socket.getLocalPort())));
+ else throw new
IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new
Integer(socket.getLocalPort())));
+ } else if (log.isTraceEnabled())
log.trace(sm.getString("IDataSender.ack.receive", getAddress(),new
Integer(socket.getLocalPort())));
} catch (IOException x) {
missingAckCounter++;
- String errmsg = sm.getString("IDataSender.ack.missing",
getAddress(),
- new Integer(socket.getLocalPort()),
- new Long(this.ackTimeout));
+ String errmsg = sm.getString("IDataSender.ack.missing",
getAddress(),new Integer(socket.getLocalPort()), new Long(this.ackTimeout));
if ( !this.isSuspect() ) {
this.setSuspect(true);
if ( log.isWarnEnabled() ) log.warn(errmsg, x);
@@ -925,9 +893,7 @@
}
throw x;
} finally {
- if(doWaitAckStats) {
- addWaitAckStats(time);
- }
+ if(doWaitAckStats) addWaitAckStats(time);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]