This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch 10.1.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/10.1.x by this push:
new fa0e97e9fd Code clean-up - formatting. No functional change
fa0e97e9fd is described below
commit fa0e97e9fdc62fd90320869e9fa51c40dead3d0c
Author: Mark Thomas <[email protected]>
AuthorDate: Fri May 10 14:59:18 2024 +0100
Code clean-up - formatting. No functional change
---
.../catalina/tribes/transport/AbstractRxTask.java | 3 +-
.../catalina/tribes/transport/AbstractSender.java | 15 +-
.../catalina/tribes/transport/Constants.java | 8 +-
.../catalina/tribes/transport/DataSender.java | 7 +
.../tribes/transport/MultiPointSender.java | 9 +-
.../catalina/tribes/transport/PooledSender.java | 51 ++---
.../catalina/tribes/transport/ReceiverBase.java | 66 ++++---
.../tribes/transport/ReplicationTransmitter.java | 11 +-
.../catalina/tribes/transport/RxTaskPool.java | 48 ++---
.../catalina/tribes/transport/SenderState.java | 6 +-
.../catalina/tribes/transport/nio/NioReceiver.java | 196 +++++++++----------
.../tribes/transport/nio/NioReplicationTask.java | 212 ++++++++++-----------
.../catalina/tribes/transport/nio/NioSender.java | 195 +++++++++----------
.../tribes/transport/nio/ParallelNioSender.java | 141 +++++++-------
.../tribes/transport/nio/PooledParallelSender.java | 16 +-
15 files changed, 502 insertions(+), 482 deletions(-)
diff --git a/java/org/apache/catalina/tribes/transport/AbstractRxTask.java
b/java/org/apache/catalina/tribes/transport/AbstractRxTask.java
index 2a95f7dd53..2d17ae166b 100644
--- a/java/org/apache/catalina/tribes/transport/AbstractRxTask.java
+++ b/java/org/apache/catalina/tribes/transport/AbstractRxTask.java
@@ -18,8 +18,7 @@ package org.apache.catalina.tribes.transport;
import org.apache.catalina.tribes.io.ListenCallback;
-public abstract class AbstractRxTask implements Runnable
-{
+public abstract class AbstractRxTask implements Runnable {
public static final int OPTION_DIRECT_BUFFER =
ReceiverBase.OPTION_DIRECT_BUFFER;
diff --git a/java/org/apache/catalina/tribes/transport/AbstractSender.java
b/java/org/apache/catalina/tribes/transport/AbstractSender.java
index ba17303123..f3b7a0a563 100644
--- a/java/org/apache/catalina/tribes/transport/AbstractSender.java
+++ b/java/org/apache/catalina/tribes/transport/AbstractSender.java
@@ -37,7 +37,7 @@ public abstract class AbstractSender implements DataSender {
private Member destination;
private InetAddress address;
private int port;
- private int maxRetryAttempts = 1;//1 resends
+ private int maxRetryAttempts = 1;// 1 resends
private int attempt;
private boolean tcpNoDelay = true;
private boolean soKeepAlive = false;
@@ -52,8 +52,9 @@ public abstract class AbstractSender implements DataSender {
/**
* transfers sender properties from one sender to another
+ *
* @param from AbstractSender
- * @param to AbstractSender
+ * @param to AbstractSender
*/
public static void transferProperties(AbstractSender from, AbstractSender
to) {
to.rxBufSize = from.rxBufSize;
@@ -87,19 +88,19 @@ public abstract class AbstractSender implements DataSender {
public boolean keepalive() {
boolean disconnect = false;
if (isUdpBased()) {
- disconnect = true; //always disconnect UDP, TODO optimize the
keepalive handling
- } else if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) {
+ disconnect = true; // always disconnect UDP, TODO optimize the
keepalive handling
+ } else if (keepAliveCount >= 0 && requestCount > keepAliveCount) {
disconnect = true;
- } else if ( keepAliveTime >= 0 &&
(System.currentTimeMillis()-connectTime)>keepAliveTime ) {
+ } else if (keepAliveTime >= 0 && (System.currentTimeMillis() -
connectTime) > keepAliveTime) {
disconnect = true;
}
- if ( disconnect ) {
+ if (disconnect) {
disconnect();
}
return disconnect;
}
- protected void setConnected(boolean connected){
+ protected void setConnected(boolean connected) {
this.connected = connected;
}
diff --git a/java/org/apache/catalina/tribes/transport/Constants.java
b/java/org/apache/catalina/tribes/transport/Constants.java
index 07946d826e..46983a493d 100644
--- a/java/org/apache/catalina/tribes/transport/Constants.java
+++ b/java/org/apache/catalina/tribes/transport/Constants.java
@@ -19,8 +19,8 @@ package org.apache.catalina.tribes.transport;
import org.apache.catalina.tribes.io.XByteBuffer;
/**
- * Manifest constants for the <code>org.apache.catalina.tribes.transport</code>
- * package.
+ * Manifest constants for the
<code>org.apache.catalina.tribes.transport</code> package.
+ *
* @author Peter Rossbach
*/
public class Constants {
@@ -33,8 +33,8 @@ public class Constants {
/*
* Do not change any of these values!
*/
- public static final byte[] ACK_DATA = new byte[] {6, 2, 3};
- public static final byte[] FAIL_ACK_DATA = new byte[] {11, 0, 5};
+ public static final byte[] ACK_DATA = new byte[] { 6, 2, 3 };
+ public static final byte[] FAIL_ACK_DATA = new byte[] { 11, 0, 5 };
public static final byte[] ACK_COMMAND =
XByteBuffer.createDataPackage(ACK_DATA);
public static final byte[] FAIL_ACK_COMMAND =
XByteBuffer.createDataPackage(FAIL_ACK_DATA);
diff --git a/java/org/apache/catalina/tribes/transport/DataSender.java
b/java/org/apache/catalina/tribes/transport/DataSender.java
index decea50e16..96092a7d6b 100644
--- a/java/org/apache/catalina/tribes/transport/DataSender.java
+++ b/java/org/apache/catalina/tribes/transport/DataSender.java
@@ -22,6 +22,7 @@ public interface DataSender {
/**
* Connect.
+ *
* @throws IOException when an error occurs
*/
void connect() throws IOException;
@@ -38,36 +39,42 @@ public interface DataSender {
/**
* Set the receive buffer size.
+ *
* @param size the new size
*/
void setRxBufSize(int size);
/**
* Set the transmit buffer size.
+ *
* @param size the new size
*/
void setTxBufSize(int size);
/**
* Keepalive.
+ *
* @return {@code true} if kept alive
*/
boolean keepalive();
/**
* Set the socket timeout.
+ *
* @param timeout in ms
*/
void setTimeout(long timeout);
/**
* Set the amount of requests during which to keepalive.
+ *
* @param maxRequests the amount of requests
*/
void setKeepAliveCount(int maxRequests);
/**
* Set the keepalive time.
+ *
* @param keepAliveTimeInMs the time in ms
*/
void setKeepAliveTime(long keepAliveTimeInMs);
diff --git a/java/org/apache/catalina/tribes/transport/MultiPointSender.java
b/java/org/apache/catalina/tribes/transport/MultiPointSender.java
index d6b3d4633c..f6bebaa746 100644
--- a/java/org/apache/catalina/tribes/transport/MultiPointSender.java
+++ b/java/org/apache/catalina/tribes/transport/MultiPointSender.java
@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.catalina.tribes.transport;
+
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
@@ -23,32 +24,38 @@ public interface MultiPointSender extends DataSender {
/**
* Send the specified message.
+ *
* @param destination the message destinations
- * @param data the data to send
+ * @param data the data to send
+ *
* @throws ChannelException if an error occurs
*/
void sendMessage(Member[] destination, ChannelMessage data) throws
ChannelException;
/**
* Set the maximum retry attempts.
+ *
* @param attempts the retry count
*/
void setMaxRetryAttempts(int attempts);
/**
* Configure the use of a direct buffer.
+ *
* @param directBuf {@code true} to use a direct buffer
*/
void setDirectBuffer(boolean directBuf);
/**
* Send to the specified member.
+ *
* @param member the member
*/
void add(Member member);
/**
* Stop sending to the specified member.
+ *
* @param member the member
*/
void remove(Member member);
diff --git a/java/org/apache/catalina/tribes/transport/PooledSender.java
b/java/org/apache/catalina/tribes/transport/PooledSender.java
index 457230c685..b0af0fa4c9 100644
--- a/java/org/apache/catalina/tribes/transport/PooledSender.java
+++ b/java/org/apache/catalina/tribes/transport/PooledSender.java
@@ -28,14 +28,14 @@ import org.apache.juli.logging.LogFactory;
public abstract class PooledSender extends AbstractSender implements
MultiPointSender {
private static final Log log = LogFactory.getLog(PooledSender.class);
- protected static final StringManager sm =
- StringManager.getManager(Constants.Package);
+ protected static final StringManager sm =
StringManager.getManager(Constants.Package);
private final SenderQueue queue;
private int poolSize = 25;
private long maxWait = 3000;
+
public PooledSender() {
- queue = new SenderQueue(this,poolSize);
+ queue = new SenderQueue(this, poolSize);
}
public abstract DataSender getNewDataSender();
@@ -51,7 +51,7 @@ public abstract class PooledSender extends AbstractSender
implements MultiPointS
@Override
public synchronized void connect() throws IOException {
- //do nothing, happens in the socket sender itself
+ // do nothing, happens in the socket sender itself
queue.open();
setConnected(true);
}
@@ -91,8 +91,8 @@ public abstract class PooledSender extends AbstractSender
implements MultiPointS
@Override
public boolean keepalive() {
- //do nothing, the pool checks on every return
- return (queue==null)?false:queue.checkIdleKeepAlive();
+ // do nothing, the pool checks on every return
+ return (queue == null) ? false : queue.checkIdleKeepAlive();
}
@Override
@@ -102,12 +102,12 @@ public abstract class PooledSender extends AbstractSender
implements MultiPointS
@Override
public void remove(Member member) {
- //no op for now, should not cancel out any keys
- //can create serious sync issues
- //all TCP connections are cleared out through keepalive
- //and if remote node disappears
+ // no op for now, should not cancel out any keys
+ // can create serious sync issues
+ // all TCP connections are cleared out through keepalive
+ // and if remote node disappears
}
- // ----------------------------------------------------- Inner Class
+ // ----------------------------------------------------- Inner Class
private static class SenderQueue {
private int limit = 25;
@@ -133,6 +133,7 @@ public abstract class PooledSender extends AbstractSender
implements MultiPointS
public int getLimit() {
return limit;
}
+
/**
* @param limit The limit to set.
*/
@@ -159,7 +160,7 @@ public abstract class PooledSender extends AbstractSender
implements MultiPointS
public synchronized DataSender getSender(long timeout) {
long start = System.currentTimeMillis();
- while ( true ) {
+ while (true) {
if (!isOpen) {
throw new
IllegalStateException(sm.getString("pooledSender.closed.queue"));
}
@@ -172,35 +173,35 @@ public abstract class PooledSender extends AbstractSender
implements MultiPointS
if (sender != null) {
inuse.add(sender);
return sender;
- }//end if
+ } // end if
long delta = System.currentTimeMillis() - start;
- if ( delta > timeout && timeout>0) {
+ if (delta > timeout && timeout > 0) {
return null;
} else {
try {
- wait(Math.max(timeout - delta,1));
- }catch (InterruptedException x){}
- }//end if
+ wait(Math.max(timeout - delta, 1));
+ } catch (InterruptedException x) {
+ }
+ } // end if
}
}
public synchronized void returnSender(DataSender sender) {
- if ( !isOpen) {
+ if (!isOpen) {
sender.disconnect();
return;
}
- //to do
+ // to do
inuse.remove(sender);
- //just in case the limit has changed
- if ( notinuse.size() < this.getLimit() ) {
+ // just in case the limit has changed
+ if (notinuse.size() < this.getLimit()) {
notinuse.add(sender);
} else {
try {
sender.disconnect();
} catch (Exception e) {
if (log.isDebugEnabled()) {
- log.debug(sm.getString(
- "PooledSender.senderDisconnectFail"), e);
+
log.debug(sm.getString("PooledSender.senderDisconnectFail"), e);
}
}
}
@@ -214,11 +215,11 @@ public abstract class PooledSender extends AbstractSender
implements MultiPointS
for (Object value : unused) {
DataSender sender = (DataSender) value;
sender.disconnect();
- }//for
+ } // for
for (Object o : used) {
DataSender sender = (DataSender) o;
sender.disconnect();
- }//for
+ } // for
notinuse.clear();
inuse.clear();
notifyAll();
diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java
b/java/org/apache/catalina/tribes/transport/ReceiverBase.java
index 98385a750b..385dd28390 100644
--- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java
+++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java
@@ -53,7 +53,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
private MessageListener listener;
private String host = "auto";
private InetAddress bind;
- private int port = 4000;
+ private int port = 4000;
private int udpPort = -1;
private int securePort = -1;
private int rxBufSize = Constants.DEFAULT_CLUSTER_MSG_BUFFER_SIZE;
@@ -65,7 +65,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
private RxTaskPool pool;
private boolean direct = true;
private long tcpSelectorTimeout = 5000;
- //how many times to search for an available socket
+ // how many times to search for an available socket
private int autoBind = 100;
private int maxThreads = 15;
private int minThreads = 6;
@@ -78,7 +78,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
private boolean soLingerOn = true;
private int soLingerTime = 3;
private int soTrafficClass = 0x04 | 0x08 | 0x010;
- private int timeout = 3000; //3 seconds
+ private int timeout = 3000; // 3 seconds
private boolean useBufferPool = true;
private boolean daemon = true;
private long maxIdleTime = 60000;
@@ -96,8 +96,9 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
@Override
public void start() throws IOException {
- if ( executor == null ) {
- //executor = new
ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new
LinkedBlockingQueue<Runnable>());
+ if (executor == null) {
+ // executor = new
ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new
+ // LinkedBlockingQueue<Runnable>());
String channelName = "";
if (channel.getName() != null) {
channelName = "[" + channel.getName() + "]";
@@ -114,9 +115,8 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
@Override
public void stop() {
- if ( executor != null )
- {
- executor.shutdownNow();//ignore left overs
+ if (executor != null) {
+ executor.shutdownNow();// ignore left overs
}
executor = null;
if (oname != null) {
@@ -181,14 +181,14 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
}
/**
- * Attempts to bind using the provided port and if that fails attempts to
- * bind to each of the ports from portstart to (portstart + retries -1)
- * until either there are no more ports or the bind is successful. The
- * address to bind to is obtained via a call to {link {@link #getBind()}.
- * @param socket The socket to bind
- * @param portstart Starting port for bind attempts
- * @param retries Number of times to attempt to bind (port
incremented
- * between attempts)
+ * Attempts to bind using the provided port and if that fails attempts to
bind to each of the ports from portstart
+ * to (portstart + retries -1) until either there are no more ports or the
bind is successful. The address to bind
+ * to is obtained via a call to {link {@link #getBind()}.
+ *
+ * @param socket The socket to bind
+ * @param portstart Starting port for bind attempts
+ * @param retries Number of times to attempt to bind (port incremented
between attempts)
+ *
* @throws IOException Socket bind error
*/
protected void bind(ServerSocket socket, int portstart, int retries)
throws IOException {
@@ -202,9 +202,9 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
setPort(port);
log.info(sm.getString("receiverBase.socket.bind", addr));
retries = 0;
- } catch ( IOException x) {
+ } catch (IOException x) {
retries--;
- if ( retries <= 0 ) {
+ if (retries <= 0) {
log.info(sm.getString("receiverBase.unable.bind",
addr));
throw x;
}
@@ -216,25 +216,27 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
/**
* Same as bind() except it does it for the UDP port
+ *
* @param socket The socket to bind
* @param portstart Starting port for bind attempts
- * @param retries Number of times to attempt to bind (port incremented
- * between attempts)
+ * @param retries Number of times to attempt to bind (port incremented
between attempts)
+ *
* @return int The retry count
+ *
* @throws IOException Socket bind error
*/
protected int bindUdp(DatagramSocket socket, int portstart, int retries)
throws IOException {
InetSocketAddress addr = null;
- while ( retries > 0 ) {
+ while (retries > 0) {
try {
addr = new InetSocketAddress(getBind(), portstart);
socket.bind(addr);
setUdpPort(portstart);
log.info(sm.getString("receiverBase.udp.bind", addr));
return 0;
- }catch ( IOException x) {
+ } catch (IOException x) {
retries--;
- if ( retries <= 0 ) {
+ if (retries <= 0) {
log.info(sm.getString("receiverBase.unable.bind.udp",
addr));
throw x;
}
@@ -244,7 +246,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
} catch (InterruptedException ti) {
Thread.currentThread().interrupt();
}
- retries = bindUdp(socket,portstart,retries);
+ retries = bindUdp(socket, portstart, retries);
}
}
return retries;
@@ -253,8 +255,8 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
@Override
public void messageDataReceived(ChannelMessage data) {
- if ( this.listener != null ) {
- if ( listener.accept(data) ) {
+ if (this.listener != null) {
+ if (listener.accept(data)) {
listener.messageReceived(data);
}
}
@@ -262,7 +264,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
public int getWorkerThreadOptions() {
int options = 0;
- if ( getDirect() ) {
+ if (getDirect()) {
options = options | OPTION_DIRECT_BUFFER;
}
return options;
@@ -281,7 +283,6 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
}
-
public void setDirect(boolean direct) {
this.direct = direct;
}
@@ -395,6 +396,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
public void setAddress(String host) {
this.host = host;
}
+
public void setHost(String host) {
setAddress(host);
}
@@ -413,7 +415,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
public void setAutoBind(int autoBind) {
this.autoBind = autoBind;
- if ( this.autoBind <= 0 ) {
+ if (this.autoBind <= 0) {
this.autoBind = 1;
}
}
@@ -481,7 +483,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
@Override
public void heartbeat() {
- //empty operation
+ // empty operation
}
@Override
@@ -522,6 +524,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
// ---------------------------------------------- stats of the thread pool
/**
* Return the current number of threads that are managed by the pool.
+ *
* @return the current number of threads that are managed by the pool
*/
public int getPoolSize() {
@@ -534,6 +537,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
/**
* Return the current number of threads that are in use.
+ *
* @return the current number of threads that are in use
*/
public int getActiveCount() {
@@ -546,6 +550,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
/**
* Return the total number of tasks that have ever been scheduled for
execution by the pool.
+ *
* @return the total number of tasks that have ever been scheduled for
execution by the pool
*/
public long getTaskCount() {
@@ -558,6 +563,7 @@ public abstract class ReceiverBase implements
ChannelReceiver, ListenCallback, R
/**
* Return the total number of tasks that have completed execution by the
pool.
+ *
* @return the total number of tasks that have completed execution by the
pool
*/
public long getCompletedTaskCount() {
diff --git
a/java/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
b/java/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
index 78074eda61..f4fb162533 100644
--- a/java/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
+++ b/java/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
@@ -29,9 +29,7 @@ import org.apache.catalina.tribes.jmx.JmxRegistry;
import org.apache.catalina.tribes.transport.nio.PooledParallelSender;
/**
- * Transmit message to other cluster members
- * Actual senders are created based on the replicationMode
- * type
+ * Transmit message to other cluster members Actual senders are created based
on the replicationMode type
*/
public class ReplicationTransmitter implements ChannelSender {
@@ -60,7 +58,7 @@ public class ReplicationTransmitter implements ChannelSender {
@Override
public void sendMessage(ChannelMessage message, Member[] destination)
throws ChannelException {
MultiPointSender sender = getTransport();
- sender.sendMessage(destination,message);
+ sender.sendMessage(destination, message);
}
@@ -101,14 +99,13 @@ public class ReplicationTransmitter implements
ChannelSender {
*/
@Override
public void heartbeat() {
- if (getTransport()!=null) {
+ if (getTransport() != null) {
getTransport().keepalive();
}
}
/**
- * add new cluster member and create sender ( s. replicationMode) transfer
- * current properties to sender
+ * add new cluster member and create sender ( s. replicationMode) transfer
current properties to sender
*
* @see
org.apache.catalina.tribes.ChannelSender#add(org.apache.catalina.tribes.Member)
*/
diff --git a/java/org/apache/catalina/tribes/transport/RxTaskPool.java
b/java/org/apache/catalina/tribes/transport/RxTaskPool.java
index d17dde5506..4d36367476 100644
--- a/java/org/apache/catalina/tribes/transport/RxTaskPool.java
+++ b/java/org/apache/catalina/tribes/transport/RxTaskPool.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.catalina.tribes.transport;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
- * A very simple thread pool class. The pool size is set at
- * construction time and remains fixed. Threads are cycled
+ * A very simple thread pool class. The pool size is set at construction time
and remains fixed. Threads are cycled
* through a FIFO idle queue.
*/
public class RxTaskPool {
@@ -38,7 +38,7 @@ public class RxTaskPool {
private final TaskCreator creator;
- public RxTaskPool (int maxTasks, int minTasks, TaskCreator creator) throws
Exception {
+ public RxTaskPool(int maxTasks, int minTasks, TaskCreator creator) throws
Exception {
// fill up the pool with worker threads
this.maxTasks = maxTasks;
this.minTasks = minTasks;
@@ -48,30 +48,30 @@ public class RxTaskPool {
protected void configureTask(AbstractRxTask task) {
synchronized (task) {
task.setTaskPool(this);
-// task.setName(task.getClass().getName() + "[" + inc() + "]");
-// task.setDaemon(true);
-// task.setPriority(Thread.MAX_PRIORITY);
-// task.start();
+ // task.setName(task.getClass().getName() + "[" + inc() + "]");
+ // task.setDaemon(true);
+ // task.setPriority(Thread.MAX_PRIORITY);
+ // task.start();
}
}
/**
- * Find an idle worker thread, if any. Could return null.
+ * Find an idle worker thread, if any. Could return null.
+ *
* @return a worker
*/
- public AbstractRxTask getRxTask()
- {
+ public AbstractRxTask getRxTask() {
AbstractRxTask worker = null;
synchronized (mutex) {
- while ( worker == null && running ) {
+ while (worker == null && running) {
if (idle.size() > 0) {
try {
worker = idle.remove(0);
} catch (java.util.NoSuchElementException x) {
- //this means that there are no available workers
+ // this means that there are no available workers
worker = null;
}
- } else if ( used.size() < this.maxTasks && creator != null) {
+ } else if (used.size() < this.maxTasks && creator != null) {
worker = creator.createRxTask();
configureTask(worker);
} else {
@@ -81,8 +81,8 @@ public class RxTaskPool {
Thread.currentThread().interrupt();
}
}
- }//while
- if ( worker != null ) {
+ } // while
+ if (worker != null) {
used.add(worker);
}
}
@@ -96,17 +96,17 @@ public class RxTaskPool {
}
/**
- * Called by the worker thread to return itself to the
- * idle pool.
+ * Called by the worker thread to return itself to the idle pool.
+ *
* @param worker The worker
*/
- public void returnWorker (AbstractRxTask worker) {
- if ( running ) {
+ public void returnWorker(AbstractRxTask worker) {
+ if (running) {
synchronized (mutex) {
used.remove(worker);
- //if ( idle.size() < minThreads && !idle.contains(worker))
idle.add(worker);
- if ( idle.size() < maxTasks && !idle.contains(worker)) {
- idle.add(worker); //let max be the upper limit
+ // if ( idle.size() < minThreads && !idle.contains(worker))
idle.add(worker);
+ if (idle.size() < maxTasks && !idle.contains(worker)) {
+ idle.add(worker); // let max be the upper limit
} else {
worker.close();
}
@@ -129,7 +129,7 @@ public class RxTaskPool {
running = false;
synchronized (mutex) {
Iterator<AbstractRxTask> i = idle.iterator();
- while ( i.hasNext() ) {
+ while (i.hasNext()) {
AbstractRxTask worker = i.next();
returnWorker(worker);
i.remove();
@@ -149,7 +149,7 @@ public class RxTaskPool {
return this.creator;
}
- public interface TaskCreator {
+ public interface TaskCreator {
AbstractRxTask createRxTask();
}
}
diff --git a/java/org/apache/catalina/tribes/transport/SenderState.java
b/java/org/apache/catalina/tribes/transport/SenderState.java
index fedac53917..a333ae127d 100644
--- a/java/org/apache/catalina/tribes/transport/SenderState.java
+++ b/java/org/apache/catalina/tribes/transport/SenderState.java
@@ -27,7 +27,7 @@ public class SenderState {
public static final int SUSPECT = 1;
public static final int FAILING = 2;
- protected static final ConcurrentMap<Member, SenderState> memberStates =
new ConcurrentHashMap<>();
+ protected static final ConcurrentMap<Member,SenderState> memberStates =
new ConcurrentHashMap<>();
public static SenderState getSenderState(Member member) {
return getSenderState(member, true);
@@ -54,7 +54,7 @@ public class SenderState {
private volatile int state = READY;
- // ----------------------------------------------------- Constructor
+ // ----------------------------------------------------- Constructor
private SenderState() {
@@ -90,6 +90,6 @@ public class SenderState {
}
- // ----------------------------------------------------- Public Properties
+ // ----------------------------------------------------- Public Properties
}
diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
index 093851b2b2..4f3c1c1668 100644
--- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
+++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
@@ -71,11 +71,11 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
public void start() throws IOException {
super.start();
try {
- setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
+ setPool(new RxTaskPool(getMaxThreads(), getMinThreads(), this));
} catch (Exception x) {
log.fatal(sm.getString("nioReceiver.threadpool.fail"), x);
- if ( x instanceof IOException ) {
- throw (IOException)x;
+ if (x instanceof IOException) {
+ throw (IOException) x;
} else {
throw new IOException(x.getMessage());
}
@@ -92,8 +92,8 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
t.start();
} catch (Exception x) {
log.fatal(sm.getString("nioReceiver.start.fail"), x);
- if ( x instanceof IOException ) {
- throw (IOException)x;
+ if (x instanceof IOException) {
+ throw (IOException) x;
} else {
throw new IOException(x.getMessage());
}
@@ -102,7 +102,7 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
@Override
public AbstractRxTask createRxTask() {
- NioReplicationTask thread = new NioReplicationTask(this,this);
+ NioReplicationTask thread = new NioReplicationTask(this, this);
thread.setUseBufferPool(this.getUseBufferPool());
thread.setRxBufSize(getRxBufSize());
thread.setOptions(getWorkerThreadOptions());
@@ -110,7 +110,6 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
}
-
protected void bind() throws IOException {
// allocate an unbound server socket channel
serverChannel = ServerSocketChannel.open();
@@ -119,19 +118,19 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
// create a new Selector for use below
this.selector.set(Selector.open());
// set the port the server channel will listen to
- //serverSocket.bind(new InetSocketAddress(getBind(),
getTcpListenPort()));
- bind(serverSocket,getPort(),getAutoBind());
+ // serverSocket.bind(new InetSocketAddress(getBind(),
getTcpListenPort()));
+ bind(serverSocket, getPort(), getAutoBind());
// set non-blocking mode for the listening socket
serverChannel.configureBlocking(false);
// register the ServerSocketChannel with the Selector
serverChannel.register(this.selector.get(), SelectionKey.OP_ACCEPT);
- //set up the datagram channel
- if (this.getUdpPort()>0) {
+ // set up the datagram channel
+ if (this.getUdpPort() > 0) {
datagramChannel = DatagramChannel.open();
configureDatagraChannel();
- //bind to the address to avoid security checks
- bindUdp(datagramChannel.socket(),getUdpPort(),getAutoBind());
+ // bind to the address to avoid security checks
+ bindUdp(datagramChannel.socket(), getUdpPort(), getAutoBind());
}
}
@@ -162,7 +161,7 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
return;
}
Runnable r = null;
- while ((r = events.pollFirst()) != null ) {
+ while ((r = events.pollFirst()) != null) {
try {
if (log.isTraceEnabled()) {
log.trace("Processing event in selector:" + r);
@@ -175,73 +174,84 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
}
public static void cancelledKey(SelectionKey key) {
- ObjectReader reader = (ObjectReader)key.attachment();
- if ( reader != null ) {
+ ObjectReader reader = (ObjectReader) key.attachment();
+ if (reader != null) {
reader.setCancelled(true);
reader.finish();
}
key.cancel();
key.attach(null);
if (key.channel() instanceof SocketChannel) {
- try { ((SocketChannel)key.channel()).socket().close(); } catch
(IOException e) { if (log.isDebugEnabled()) {
- log.debug(sm.getString("nioReceiver.closeError"), e);
- } }
+ try {
+ ((SocketChannel) key.channel()).socket().close();
+ } catch (IOException e) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("nioReceiver.closeError"), e);
+ }
+ }
}
if (key.channel() instanceof DatagramChannel) {
- try { ((DatagramChannel)key.channel()).socket().close(); } catch
(Exception e) { if (log.isDebugEnabled()) {
+ try {
+ ((DatagramChannel) key.channel()).socket().close();
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("nioReceiver.closeError"), e);
+ }
+ }
+ }
+ try {
+ key.channel().close();
+ } catch (IOException e) {
+ if (log.isDebugEnabled()) {
log.debug(sm.getString("nioReceiver.closeError"), e);
- } }
+ }
}
- try { key.channel().close(); } catch (IOException e) { if
(log.isDebugEnabled()) {
- log.debug(sm.getString("nioReceiver.closeError"), e);
- } }
}
+
protected long lastCheck = System.currentTimeMillis();
+
protected void socketTimeouts() {
long now = System.currentTimeMillis();
- if ( (now-lastCheck) < getSelectorTimeout() ) {
+ if ((now - lastCheck) < getSelectorTimeout()) {
return;
}
- //timeout
+ // timeout
Selector tmpsel = this.selector.get();
- Set<SelectionKey> keys =
(isListening()&&tmpsel!=null)?tmpsel.keys():null;
- if ( keys == null ) {
+ Set<SelectionKey> keys = (isListening() && tmpsel != null) ?
tmpsel.keys() : null;
+ if (keys == null) {
return;
}
for (SelectionKey key : keys) {
try {
-// if (key.interestOps() == SelectionKey.OP_READ) {
-// //only timeout sockets that we are waiting for a read
from
-// ObjectReader ka = (ObjectReader) key.attachment();
-// long delta = now - ka.getLastAccess();
-// if (delta > (long) getTimeout()) {
-// cancelledKey(key);
-// }
-// }
-// else
- if ( key.interestOps() == 0 ) {
- //check for keys that didn't make it in.
+ // if (key.interestOps() == SelectionKey.OP_READ) {
+ // //only timeout sockets that we are waiting for a read from
+ // ObjectReader ka = (ObjectReader) key.attachment();
+ // long delta = now - ka.getLastAccess();
+ // if (delta > (long) getTimeout()) {
+ // cancelledKey(key);
+ // }
+ // }
+ // else
+ if (key.interestOps() == 0) {
+ // check for keys that didn't make it in.
ObjectReader ka = (ObjectReader) key.attachment();
- if ( ka != null ) {
+ if (ka != null) {
long delta = now - ka.getLastAccess();
if (delta > getTimeout() && (!ka.isAccessed())) {
if (log.isWarnEnabled()) {
- log.warn(sm.getString(
- "nioReceiver.threadsExhausted",
- Integer.valueOf(getTimeout()),
- Boolean.valueOf(ka.isCancelled()),
- key,
+
log.warn(sm.getString("nioReceiver.threadsExhausted",
Integer.valueOf(getTimeout()),
+ Boolean.valueOf(ka.isCancelled()), key,
new
java.sql.Timestamp(ka.getLastAccess())));
}
ka.setLastAccess(now);
- //key.interestOps(SelectionKey.OP_READ);
- }//end if
+ // key.interestOps(SelectionKey.OP_READ);
+ } // end if
} else {
cancelledKey(key);
- }//end if
- }//end if
- }catch ( CancelledKeyException ckx ) {
+ } // end if
+ } // end if
+ } catch (CancelledKeyException ckx) {
cancelledKey(key);
}
}
@@ -250,8 +260,8 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
/**
- * Get data from channel and store in byte array
- * send it to cluster
+ * Get data from channel and store in byte array send it to cluster
+ *
* @throws IOException IO error
*/
protected void listen() throws Exception {
@@ -265,9 +275,9 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
// Avoid NPEs if selector is set to null on stop.
Selector selector = this.selector.get();
- if (selector!=null && datagramChannel!=null) {
- ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size
for a datagram packet
-
registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader);
+ if (selector != null && datagramChannel != null) {
+ ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); // max size
for a datagram packet
+ registerChannel(selector, datagramChannel, SelectionKey.OP_READ,
oreader);
}
while (doListen() && selector != null) {
@@ -278,22 +288,22 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
socketTimeouts();
int n = selector.select(getSelectorTimeout());
if (n == 0) {
- //there is a good chance that we got here
- //because the TcpReplicationThread called
- //selector wakeup().
- //if that happens, we must ensure that that
- //thread has enough time to call interestOps
-// synchronized (interestOpsMutex) {
- //if we got the lock, means there are no
- //keys trying to register for the
- //interestOps method
-// }
+ // there is a good chance that we got here
+ // because the TcpReplicationThread called
+ // selector wakeup().
+ // if that happens, we must ensure that that
+ // thread has enough time to call interestOps
+ // synchronized (interestOpsMutex) {
+ // if we got the lock, means there are no
+ // keys trying to register for the
+ // interestOps method
+ // }
continue; // nothing to do
}
// get an iterator over the set of selected keys
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
// look at each key in the selected set
- while (it!=null && it.hasNext()) {
+ while (it != null && it.hasNext()) {
SelectionKey key = it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
@@ -305,13 +315,10 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
channel.socket().setKeepAlive(getSoKeepAlive());
channel.socket().setOOBInline(getOoBInline());
channel.socket().setReuseAddress(getSoReuseAddress());
-
channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
+ channel.socket().setSoLinger(getSoLingerOn(),
getSoLingerTime());
channel.socket().setSoTimeout(getTimeout());
Object attach = new ObjectReader(channel);
- registerChannel(selector,
- channel,
- SelectionKey.OP_READ,
- attach);
+ registerChannel(selector, channel,
SelectionKey.OP_READ, attach);
}
// is there data to read on this channel?
if (key.isReadable()) {
@@ -334,21 +341,20 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
}
serverChannel.close();
- if (datagramChannel!=null) {
+ if (datagramChannel != null) {
try {
datagramChannel.close();
- }catch (Exception iox) {
+ } catch (Exception iox) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("nioReceiver.closeError"), iox);
}
}
- datagramChannel=null;
+ datagramChannel = null;
}
closeSelector();
}
-
/**
* Close Selector.
*
@@ -365,7 +371,7 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
int count = 0;
while (running && count < 50) {
Thread.sleep(100);
- count ++;
+ count++;
}
if (running) {
log.warn(sm.getString("nioReceiver.stop.threadRunning"));
@@ -391,16 +397,16 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
key.attach(null);
key.cancel();
}
- } catch (IOException ignore){
+ } catch (IOException ignore) {
if (log.isWarnEnabled()) {
log.warn(sm.getString("nioReceiver.cleanup.fail"), ignore);
}
- } catch (ClosedSelectorException ignore){
+ } catch (ClosedSelectorException ignore) {
// Ignore
}
try {
selector.selectNow();
- } catch (Throwable t){
+ } catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// Ignore everything else
}
@@ -410,20 +416,18 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
// ----------------------------------------------------------
/**
- * Register the given channel with the given selector for
- * the given operations of interest
+ * Register the given channel with the given selector for the given
operations of interest
+ *
* @param selector The selector to use
- * @param channel The channel
- * @param ops The operations to register
- * @param attach Attachment object
+ * @param channel The channel
+ * @param ops The operations to register
+ * @param attach Attachment object
+ *
* @throws Exception IO error with channel
*/
- protected void registerChannel(Selector selector,
- SelectableChannel channel,
- int ops,
- Object attach) throws Exception {
- if (channel == null)
- {
+ protected void registerChannel(Selector selector, SelectableChannel
channel, int ops, Object attach)
+ throws Exception {
+ if (channel == null) {
return; // could happen
}
// set the new channel non-blocking
@@ -451,11 +455,11 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
/**
* Sample data handler method for a channel with data ready to read.
- * @param key A SelectionKey object associated with a channel
- * determined by the selector to be ready for reading. If the
- * channel returns an EOF condition, it is closed here, which
- * automatically invalidates the associated key. The selector
- * will then de-register the channel on the next select call.
+ *
+ * @param key A SelectionKey object associated with a channel determined
by the selector to be ready for reading. If
+ * the channel returns an EOF condition, it is closed here,
which automatically invalidates the
+ * associated key. The selector will then de-register the
channel on the next select call.
+ *
* @throws Exception IO error with channel
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
@@ -470,7 +474,7 @@ public class NioReceiver extends ReceiverBase implements
Runnable, NioReceiverMB
}
} else {
// invoking this wakes up the worker thread then returns
- //add task to thread pool
+ // add task to thread pool
task.serviceChannel(key);
getExecutor().execute(task);
}
diff --git
a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
index f28648c3b9..f4c91e1abf 100644
--- a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
+++ b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.catalina.tribes.transport.nio;
+
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -41,14 +42,11 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
- * A worker thread class which can drain channels and echo-back the input. Each
- * instance is constructed with a reference to the owning thread pool object.
- * When started, the thread loops forever waiting to be awakened to service the
- * channel associated with a SelectionKey object. The worker is tasked by
- * calling its serviceChannel() method with a SelectionKey object. The
- * serviceChannel() method stores the key reference in the thread object then
- * calls notify() to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
+ * A worker thread class which can drain channels and echo-back the input.
Each instance is constructed with a reference
+ * to the owning thread pool object. When started, the thread loops forever
waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
calling its serviceChannel() method with a
+ * SelectionKey object. The serviceChannel() method stores the key reference
in the thread object then calls notify() to
+ * wake it up. When the channel has been drained, the worker thread returns
itself to its parent pool.
*/
public class NioReplicationTask extends AbstractRxTask {
@@ -60,7 +58,7 @@ public class NioReplicationTask extends AbstractRxTask {
private int rxBufSize;
private final NioReceiver receiver;
- public NioReplicationTask (ListenCallback callback, NioReceiver receiver) {
+ public NioReplicationTask(ListenCallback callback, NioReceiver receiver) {
super(callback);
this.receiver = receiver;
}
@@ -68,12 +66,12 @@ public class NioReplicationTask extends AbstractRxTask {
// loop forever waiting for work to do
@Override
public synchronized void run() {
- if ( buffer == null ) {
+ if (buffer == null) {
int size = getRxBufSize();
if (key.channel() instanceof DatagramChannel) {
size = ChannelReceiver.MAX_UDP_SIZE;
}
- if ( (getOptions() & OPTION_DIRECT_BUFFER) ==
OPTION_DIRECT_BUFFER) {
+ if ((getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER)
{
buffer = ByteBuffer.allocateDirect(size);
} else {
buffer = ByteBuffer.allocate(size);
@@ -84,135 +82,129 @@ public class NioReplicationTask extends AbstractRxTask {
if (key == null) {
return; // just in case
}
- if ( log.isTraceEnabled() ) {
- log.trace("Servicing key:"+key);
+ if (log.isTraceEnabled()) {
+ log.trace("Servicing key:" + key);
}
try {
- ObjectReader reader = (ObjectReader)key.attachment();
- if ( reader == null ) {
- if ( log.isTraceEnabled() ) {
- log.trace("No object reader, cancelling:"+key);
+ ObjectReader reader = (ObjectReader) key.attachment();
+ if (reader == null) {
+ if (log.isTraceEnabled()) {
+ log.trace("No object reader, cancelling:" + key);
}
cancelKey(key);
} else {
- if ( log.isTraceEnabled() ) {
- log.trace("Draining channel:"+key);
+ if (log.isTraceEnabled()) {
+ log.trace("Draining channel:" + key);
}
drainChannel(key, reader);
}
} catch (Exception e) {
- //this is common, since the sockets on the other
- //end expire after a certain time.
- if ( e instanceof CancelledKeyException ) {
- //do nothing
- } else if ( e instanceof IOException ) {
- //don't spew out stack traces for IO exceptions unless debug
is enabled.
+ // this is common, since the sockets on the other
+ // end expire after a certain time.
+ if (e instanceof CancelledKeyException) {
+ // do nothing
+ } else if (e instanceof IOException) {
+ // don't spew out stack traces for IO exceptions unless debug
is enabled.
if (log.isDebugEnabled()) {
log.debug(sm.getString("nioReplicationTask.unable.drainChannel.ioe",
e.getMessage()), e);
} else {
- log.warn
(sm.getString("nioReplicationTask.unable.drainChannel.ioe", e.getMessage()));
+
log.warn(sm.getString("nioReplicationTask.unable.drainChannel.ioe",
e.getMessage()));
}
- } else if ( log.isErrorEnabled() ) {
- //this is a real error, log it.
-
log.error(sm.getString("nioReplicationTask.exception.drainChannel"),e);
+ } else if (log.isErrorEnabled()) {
+ // this is a real error, log it.
+
log.error(sm.getString("nioReplicationTask.exception.drainChannel"), e);
}
cancelKey(key);
}
key = null;
// done, ready for more, return to pool
- getTaskPool().returnWorker (this);
+ getTaskPool().returnWorker(this);
}
/**
- * Called to initiate a unit of work by this worker thread
- * on the provided SelectionKey object. This method is
- * synchronized, as is the run() method, so only one key
- * can be serviced at a given time.
- * Before waking the worker thread, and before returning
- * to the main selection loop, this key's interest set is
- * updated to remove OP_READ. This will cause the selector
- * to ignore read-readiness for this channel while the
- * worker thread is servicing it.
+ * Called to initiate a unit of work by this worker thread on the provided
SelectionKey object. This method is
+ * synchronized, as is the run() method, so only one key can be serviced
at a given time. Before waking the worker
+ * thread, and before returning to the main selection loop, this key's
interest set is updated to remove OP_READ.
+ * This will cause the selector to ignore read-readiness for this channel
while the worker thread is servicing it.
+ *
* @param key The key to process
*/
- public synchronized void serviceChannel (SelectionKey key) {
- if ( log.isTraceEnabled() ) {
- log.trace("About to service key:"+key);
+ public synchronized void serviceChannel(SelectionKey key) {
+ if (log.isTraceEnabled()) {
+ log.trace("About to service key:" + key);
}
- ObjectReader reader = (ObjectReader)key.attachment();
- if ( reader != null ) {
+ ObjectReader reader = (ObjectReader) key.attachment();
+ if (reader != null) {
reader.setLastAccess(System.currentTimeMillis());
}
this.key = key;
- key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
- key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
/**
- * The actual code which drains the channel associated with
- * the given key. This method assumes the key has been
- * modified prior to invocation to turn off selection
- * interest in OP_READ. When this method completes it
- * re-enables OP_READ and calls wakeup() on the selector
- * so the selector will resume watching this channel.
- * @param key The key to process
+ * The actual code which drains the channel associated with the given key.
This method assumes the key has been
+ * modified prior to invocation to turn off selection interest in OP_READ.
When this method completes it re-enables
+ * OP_READ and calls wakeup() on the selector so the selector will resume
watching this channel.
+ *
+ * @param key The key to process
* @param reader The reader
+ *
* @throws Exception IO error
*/
- protected void drainChannel (final SelectionKey key, ObjectReader reader)
throws Exception {
+ protected void drainChannel(final SelectionKey key, ObjectReader reader)
throws Exception {
reader.access();
ReadableByteChannel channel = (ReadableByteChannel) key.channel();
- int count=-1;
+ int count = -1;
SocketAddress saddr = null;
if (channel instanceof SocketChannel) {
// loop while data available, channel is non-blocking
- while ((count = channel.read (buffer)) > 0) {
- buffer.flip(); // make buffer readable
- if ( buffer.hasArray() ) {
- reader.append(buffer.array(),0,count,false);
+ while ((count = channel.read(buffer)) > 0) {
+ buffer.flip(); // make buffer readable
+ if (buffer.hasArray()) {
+ reader.append(buffer.array(), 0, count, false);
} else {
- reader.append(buffer,count,false);
+ reader.append(buffer, count, false);
}
- buffer.clear(); // make buffer empty
- //do we have at least one package?
- if ( reader.hasPackage() ) {
+ buffer.clear(); // make buffer empty
+ // do we have at least one package?
+ if (reader.hasPackage()) {
break;
}
}
} else if (channel instanceof DatagramChannel) {
- DatagramChannel dchannel = (DatagramChannel)channel;
+ DatagramChannel dchannel = (DatagramChannel) channel;
saddr = dchannel.receive(buffer);
- buffer.flip(); // make buffer readable
- if ( buffer.hasArray() ) {
-
reader.append(buffer.array(),0,buffer.limit()-buffer.position(),false);
+ buffer.flip(); // make buffer readable
+ if (buffer.hasArray()) {
+ reader.append(buffer.array(), 0, buffer.limit() -
buffer.position(), false);
} else {
- reader.append(buffer,buffer.limit()-buffer.position(),false);
+ reader.append(buffer, buffer.limit() - buffer.position(),
false);
}
- buffer.clear(); // make buffer empty
- //did we get a package
- count = reader.hasPackage()?1:-1;
+ buffer.clear(); // make buffer empty
+ // did we get a package
+ count = reader.hasPackage() ? 1 : -1;
}
int pkgcnt = reader.count();
- if (count < 0 && pkgcnt == 0 ) {
- //end of stream, and no more packages to process
+ if (count < 0 && pkgcnt == 0) {
+ // end of stream, and no more packages to process
remoteEof(key);
return;
}
- ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY :
reader.execute();
+ ChannelMessage[] msgs = pkgcnt == 0 ? ChannelData.EMPTY_DATA_ARRAY :
reader.execute();
- registerForRead(key,reader);//register to read new data, before we
send it off to avoid dead locks
+ registerForRead(key, reader);// register to read new data, before we
send it off to avoid dead locks
for (ChannelMessage msg : msgs) {
/*
- * Use send ack here if you want to ack the request to the remote
- * server before completing the request
- * This is considered an asynchronous request
+ * Use send ack here if you want to ack the request to the remote
server before completing the request This
+ * is considered an asynchronous request
*/
if (ChannelData.sendAckAsync(msg.getOptions())) {
sendAck(key, (WritableByteChannel) channel,
Constants.ACK_COMMAND, saddr);
@@ -220,16 +212,16 @@ public class NioReplicationTask extends AbstractRxTask {
try {
if (Logs.MESSAGES.isTraceEnabled()) {
try {
- Logs.MESSAGES.trace("NioReplicationThread - Received
msg:" + new UniqueId(msg.getUniqueId()) + " at " + new
java.sql.Timestamp(System.currentTimeMillis()));
+ Logs.MESSAGES.trace("NioReplicationThread - Received
msg:" + new UniqueId(msg.getUniqueId()) +
+ " at " + new
java.sql.Timestamp(System.currentTimeMillis()));
} catch (Throwable t) {
}
}
- //process the message
+ // process the message
getCallback().messageDataReceived(msg);
/*
- * Use send ack here if you want the request to complete on
this
- * server before sending the ack to the remote server
- * This is considered a synchronized request
+ * Use send ack here if you want the request to complete on
this server before sending the ack to the
+ * remote server This is considered a synchronized request
*/
if (ChannelData.sendAckSync(msg.getOptions())) {
sendAck(key, (WritableByteChannel) channel,
Constants.ACK_COMMAND, saddr);
@@ -260,54 +252,54 @@ public class NioReplicationTask extends AbstractRxTask {
private void remoteEof(SelectionKey key) {
// close channel on EOF, invalidates the key
- if ( log.isDebugEnabled() ) {
+ if (log.isDebugEnabled()) {
log.debug(sm.getString("nioReplicationTask.disconnect"));
}
cancelKey(key);
}
protected void registerForRead(final SelectionKey key, ObjectReader
reader) {
- if ( log.isTraceEnabled() ) {
- log.trace("Adding key for read event:"+key);
+ if (log.isTraceEnabled()) {
+ log.trace("Adding key for read event:" + key);
}
reader.finish();
- //register our OP_READ interest
+ // register our OP_READ interest
Runnable r = () -> {
try {
if (key.isValid()) {
// resume interest in OP_READ, OP_WRITE
int resumeOps = key.interestOps() | SelectionKey.OP_READ;
key.interestOps(resumeOps);
- if ( log.isTraceEnabled() ) {
- log.trace("Registering key for read:"+key);
+ if (log.isTraceEnabled()) {
+ log.trace("Registering key for read:" + key);
}
}
- } catch (CancelledKeyException ckx ) {
+ } catch (CancelledKeyException ckx) {
NioReceiver.cancelledKey(key);
- if ( log.isTraceEnabled() ) {
- log.trace("CKX Cancelling key:"+key);
+ if (log.isTraceEnabled()) {
+ log.trace("CKX Cancelling key:" + key);
}
} catch (Exception x) {
-
log.error(sm.getString("nioReplicationTask.error.register.key", key),x);
+
log.error(sm.getString("nioReplicationTask.error.register.key", key), x);
}
};
receiver.addEvent(r);
}
private void cancelKey(final SelectionKey key) {
- if ( log.isTraceEnabled() ) {
- log.trace("Adding key for cancel event:"+key);
+ if (log.isTraceEnabled()) {
+ log.trace("Adding key for cancel event:" + key);
}
- ObjectReader reader = (ObjectReader)key.attachment();
- if ( reader != null ) {
+ ObjectReader reader = (ObjectReader) key.attachment();
+ if (reader != null) {
reader.setCancelled(true);
reader.finish();
}
Runnable cx = () -> {
- if ( log.isTraceEnabled() ) {
- log.trace("Cancelling key:"+key);
+ if (log.isTraceEnabled()) {
+ log.trace("Cancelling key:" + key);
}
NioReceiver.cancelledKey(key);
@@ -317,9 +309,10 @@ public class NioReplicationTask extends AbstractRxTask {
/**
- * Send a reply-acknowledgement (6,2,3), sends it doing a busy write, the
ACK is so small
- * that it should always go to the buffer.
- * @param key The key to use
+ * Send a reply-acknowledgement (6,2,3), sends it doing a busy write, the
ACK is so small that it should always go
+ * to the buffer.
+ *
+ * @param key The key to use
* @param channel The channel
* @param command The command to write
* @param udpaddr Target address
@@ -330,22 +323,21 @@ public class NioReplicationTask extends AbstractRxTask {
ByteBuffer buf = ByteBuffer.wrap(command);
int total = 0;
if (channel instanceof DatagramChannel) {
- DatagramChannel dchannel = (DatagramChannel)channel;
- //were using a shared channel, document says its thread safe
- //TODO check optimization, one channel per thread?
- while ( total < command.length ) {
+ DatagramChannel dchannel = (DatagramChannel) channel;
+ // were using a shared channel, document says its thread safe
+ // TODO check optimization, one channel per thread?
+ while (total < command.length) {
total += dchannel.send(buf, udpaddr);
}
} else {
- while ( total < command.length ) {
+ while (total < command.length) {
total += channel.write(buf);
}
}
if (log.isTraceEnabled()) {
log.trace("ACK sent to " +
- ( (channel instanceof SocketChannel) ?
- ((SocketChannel)channel).socket().getInetAddress() :
-
((DatagramChannel)channel).socket().getInetAddress()));
+ ((channel instanceof SocketChannel) ? ((SocketChannel)
channel).socket().getInetAddress() :
+ ((DatagramChannel)
channel).socket().getInetAddress()));
}
} catch (IOException x) {
log.warn(sm.getString("nioReplicationTask.unable.ack",
x.getMessage()));
diff --git a/java/org/apache/catalina/tribes/transport/nio/NioSender.java
b/java/org/apache/catalina/tribes/transport/nio/NioSender.java
index 0090c54035..8e7aedb79f 100644
--- a/java/org/apache/catalina/tribes/transport/nio/NioSender.java
+++ b/java/org/apache/catalina/tribes/transport/nio/NioSender.java
@@ -33,15 +33,15 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
- * This class is NOT thread safe and should never be used with more than one
thread at a time
- *
- * This is a state machine, handled by the process method
- * States are:
- * - NOT_CONNECTED -> connect() -> CONNECTED
- * - CONNECTED -> setMessage() -> READY TO WRITE
- * - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ
- * - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE
- * - TRANSFER_COMPLETE -> CONNECTED
+ * This class is NOT thread safe and should never be used with more than one
thread at a time This is a state machine,
+ * handled by the process method States are:
+ * <ul>
+ * <li>NOT_CONNECTED -> connect() -> CONNECTED</li>
+ * <li>CONNECTED -> setMessage() -> READY TO WRITE</li>
+ * <li>READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ</li>
+ * <li>READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE</li>
+ * <li>TRANSFER_COMPLETE -> CONNECTED</li>
+ * </ul>
*/
public class NioSender extends AbstractSender {
@@ -59,7 +59,7 @@ public class NioSender extends AbstractSender {
protected ByteBuffer readbuf = null;
protected ByteBuffer writebuf = null;
protected volatile byte[] current = null;
- protected final XByteBuffer ackbuf = new XByteBuffer(128,true);
+ protected final XByteBuffer ackbuf = new XByteBuffer(128, true);
protected int remaining = 0;
protected boolean complete;
@@ -72,93 +72,96 @@ public class NioSender extends AbstractSender {
/**
* State machine to send data.
- * @param key The key to use
+ *
+ * @param key The key to use
* @param waitForAck Wait for an ack
+ *
* @return <code>true</code> if the processing was successful
+ *
* @throws IOException An IO error occurred
*/
public boolean process(SelectionKey key, boolean waitForAck) throws
IOException {
int ops = key.readyOps();
key.interestOps(key.interestOps() & ~ops);
- //in case disconnect has been called
+ // in case disconnect has been called
if ((!isConnected()) && (!connecting)) {
throw new
IOException(sm.getString("nioSender.sender.disconnected"));
}
- if ( !key.isValid() ) {
+ if (!key.isValid()) {
throw new IOException(sm.getString("nioSender.key.inValid"));
}
- if ( key.isConnectable() ) {
- if ( socketChannel.finishConnect() ) {
+ if (key.isConnectable()) {
+ if (socketChannel.finishConnect()) {
completeConnect();
- if ( current != null ) {
+ if (current != null) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
return false;
- } else {
- //wait for the connection to finish
+ } else {
+ // wait for the connection to finish
key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
return false;
- }//end if
- } else if ( key.isWritable() ) {
+ } // end if
+ } else if (key.isWritable()) {
boolean writecomplete = write();
- if ( writecomplete ) {
- //we are completed, should we read an ack?
- if ( waitForAck ) {
- //register to read the ack
+ if (writecomplete) {
+ // we are completed, should we read an ack?
+ if (waitForAck) {
+ // register to read the ack
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} else {
- //if not, we are ready, setMessage will reregister us for
another write interest
- //do a health check, we have no way of verify a
disconnected
- //socket since we don't register for OP_READ on
waitForAck=false
- read();//this causes overhead
- setRequestCount(getRequestCount()+1);
+ // if not, we are ready, setMessage will reregister us for
another write interest
+ // do a health check, we have no way of verify a
disconnected
+ // socket since we don't register for OP_READ on
waitForAck=false
+ read();// this causes overhead
+ setRequestCount(getRequestCount() + 1);
return true;
}
} else {
- //we are not complete, lets write some more
- key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
- }//end if
- } else if ( key.isReadable() ) {
+ // we are not complete, lets write some more
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } // end if
+ } else if (key.isReadable()) {
boolean readcomplete = read();
- if ( readcomplete ) {
- setRequestCount(getRequestCount()+1);
+ if (readcomplete) {
+ setRequestCount(getRequestCount() + 1);
return true;
} else {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
- }//end if
+ } // end if
} else {
- //unknown state, should never happen
+ // unknown state, should never happen
log.warn(sm.getString("nioSender.unknown.state",
Integer.toString(ops)));
throw new IOException(sm.getString("nioSender.unknown.state",
Integer.toString(ops)));
- }//end if
+ } // end if
return false;
}
private void configureSocket() throws IOException {
- if (socketChannel!=null) {
+ if (socketChannel != null) {
socketChannel.configureBlocking(false);
socketChannel.socket().setSendBufferSize(getTxBufSize());
socketChannel.socket().setReceiveBufferSize(getRxBufSize());
- socketChannel.socket().setSoTimeout((int)getTimeout());
-
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0);
+ socketChannel.socket().setSoTimeout((int) getTimeout());
+ socketChannel.socket().setSoLinger(getSoLingerOn(),
getSoLingerOn() ? getSoLingerTime() : 0);
socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
socketChannel.socket().setKeepAlive(getSoKeepAlive());
socketChannel.socket().setReuseAddress(getSoReuseAddress());
socketChannel.socket().setOOBInline(getOoBInline());
-
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
+ socketChannel.socket().setSoLinger(getSoLingerOn(),
getSoLingerTime());
socketChannel.socket().setTrafficClass(getSoTrafficClass());
- } else if (dataChannel!=null) {
+ } else if (dataChannel != null) {
dataChannel.configureBlocking(false);
dataChannel.socket().setSendBufferSize(getUdpTxBufSize());
dataChannel.socket().setReceiveBufferSize(getUdpRxBufSize());
- dataChannel.socket().setSoTimeout((int)getTimeout());
+ dataChannel.socket().setSoTimeout((int) getTimeout());
dataChannel.socket().setReuseAddress(getSoReuseAddress());
dataChannel.socket().setTrafficClass(getSoTrafficClass());
}
}
private void completeConnect() {
- //we connected, register ourselves for writing
+ // we connected, register ourselves for writing
setConnected(true);
connecting = false;
setRequestCount(0);
@@ -166,27 +169,26 @@ public class NioSender extends AbstractSender {
}
-
protected boolean read() throws IOException {
- //if there is no message here, we are done
- if ( current == null ) {
+ // if there is no message here, we are done
+ if (current == null) {
return true;
}
- int read = isUdpBased()?dataChannel.read(readbuf) :
socketChannel.read(readbuf);
- //end of stream
- if ( read == -1 ) {
+ int read = isUdpBased() ? dataChannel.read(readbuf) :
socketChannel.read(readbuf);
+ // end of stream
+ if (read == -1) {
throw new
IOException(sm.getString("nioSender.unable.receive.ack"));
- } else if ( read == 0 ) {
+ } else if (read == 0) {
return false;
}
readbuf.flip();
- ackbuf.append(readbuf,read);
+ ackbuf.append(readbuf, read);
readbuf.clear();
- if (ackbuf.doesPackageExist() ) {
+ if (ackbuf.doesPackageExist()) {
byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
- boolean ack =
Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
- boolean fack =
Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
- if ( fack && getThrowOnFailedAck() ) {
+ boolean ack = Arrays.equals(ackcmd,
org.apache.catalina.tribes.transport.Constants.ACK_DATA);
+ boolean fack = Arrays.equals(ackcmd,
org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
+ if (fack && getThrowOnFailedAck()) {
throw new
RemoteProcessException(sm.getString("nioSender.receive.failedAck"));
}
return ack || fack;
@@ -197,66 +199,66 @@ public class NioSender extends AbstractSender {
protected boolean write() throws IOException {
- if ( (!isConnected()) || (this.socketChannel==null &&
this.dataChannel==null)) {
+ if ((!isConnected()) || (this.socketChannel == null &&
this.dataChannel == null)) {
throw new IOException(sm.getString("nioSender.not.connected"));
}
- if ( current != null ) {
- if ( remaining > 0 ) {
- //we have written everything, or we are starting a new package
- //protect against buffer overwrite
- int byteswritten = isUdpBased()?dataChannel.write(writebuf) :
socketChannel.write(writebuf);
+ if (current != null) {
+ if (remaining > 0) {
+ // we have written everything, or we are starting a new package
+ // protect against buffer overwrite
+ int byteswritten = isUdpBased() ? dataChannel.write(writebuf)
: socketChannel.write(writebuf);
remaining -= byteswritten;
- //if the entire message was written from the buffer
- //reset the position counter
- if ( remaining < 0 ) {
+ // if the entire message was written from the buffer
+ // reset the position counter
+ if (remaining < 0) {
remaining = 0;
}
}
- return (remaining==0);
+ return (remaining == 0);
}
- //no message to send, we can consider that complete
+ // no message to send, we can consider that complete
return true;
}
@Override
public synchronized void connect() throws IOException {
- if ( connecting || isConnected()) {
+ if (connecting || isConnected()) {
return;
}
connecting = true;
- if ( isConnected() ) {
+ if (isConnected()) {
throw new IOException(sm.getString("nioSender.already.connected"));
}
- if ( readbuf == null ) {
+ if (readbuf == null) {
readbuf = getReadBuffer();
} else {
readbuf.clear();
}
- if ( writebuf == null ) {
+ if (writebuf == null) {
writebuf = getWriteBuffer();
} else {
writebuf.clear();
}
if (isUdpBased()) {
- InetSocketAddress daddr = new
InetSocketAddress(getAddress(),getUdpPort());
- if ( dataChannel != null ) {
+ InetSocketAddress daddr = new InetSocketAddress(getAddress(),
getUdpPort());
+ if (dataChannel != null) {
throw new
IOException(sm.getString("nioSender.datagram.already.established"));
}
dataChannel = DatagramChannel.open();
configureSocket();
dataChannel.connect(daddr);
completeConnect();
- dataChannel.register(getSelector(),SelectionKey.OP_WRITE, this);
+ dataChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
} else {
- InetSocketAddress addr = new
InetSocketAddress(getAddress(),getPort());
- if ( socketChannel != null ) {
+ InetSocketAddress addr = new InetSocketAddress(getAddress(),
getPort());
+ if (socketChannel != null) {
throw new
IOException(sm.getString("nioSender.socketChannel.already.established"));
}
socketChannel = SocketChannel.open();
configureSocket();
- if ( socketChannel.connect(addr) ) {
+ if (socketChannel.connect(addr)) {
completeConnect();
socketChannel.register(getSelector(), SelectionKey.OP_WRITE,
this);
} else {
@@ -278,10 +280,10 @@ public class NioSender extends AbstractSender {
} catch (Exception x) {
// Ignore
}
- //error free close, all the way
- //try {socket.shutdownOutput();}catch ( Exception x){}
- //try {socket.shutdownInput();}catch ( Exception x){}
- //try {socket.close();}catch ( Exception x){}
+ // error free close, all the way
+ // try {socket.shutdownOutput();}catch ( Exception x){}
+ // try {socket.shutdownInput();}catch ( Exception x){}
+ // try {socket.close();}catch ( Exception x){}
try {
socketChannel.close();
} catch (Exception x) {
@@ -298,10 +300,10 @@ public class NioSender extends AbstractSender {
} catch (Exception x) {
// Ignore
}
- //error free close, all the way
- //try {socket.shutdownOutput();}catch ( Exception x){}
- //try {socket.shutdownInput();}catch ( Exception x){}
- //try {socket.close();}catch ( Exception x){}
+ // error free close, all the way
+ // try {socket.shutdownOutput();}catch ( Exception x){}
+ // try {socket.shutdownInput();}catch ( Exception x){}
+ // try {socket.close();}catch ( Exception x){}
try {
dataChannel.close();
} catch (Exception x) {
@@ -311,22 +313,22 @@ public class NioSender extends AbstractSender {
dataChannel = null;
}
}
- } catch ( Exception x ) {
+ } catch (Exception x) {
log.error(sm.getString("nioSender.unable.disconnect",
x.getMessage()));
- if ( log.isDebugEnabled() ) {
- log.debug(sm.getString("nioSender.unable.disconnect",
x.getMessage()),x);
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("nioSender.unable.disconnect",
x.getMessage()), x);
}
}
}
public void reset() {
- if ( isConnected() && readbuf == null) {
+ if (isConnected() && readbuf == null) {
readbuf = getReadBuffer();
}
- if ( readbuf != null ) {
+ if (readbuf != null) {
readbuf.clear();
}
- if ( writebuf != null ) {
+ if (writebuf != null) {
writebuf.clear();
}
current = null;
@@ -346,20 +348,21 @@ public class NioSender extends AbstractSender {
}
private ByteBuffer getBuffer(int size) {
- return
getDirectBuffer()?ByteBuffer.allocateDirect(size):ByteBuffer.allocate(size);
+ return getDirectBuffer() ? ByteBuffer.allocateDirect(size) :
ByteBuffer.allocate(size);
}
/**
* Send message.
*
* @param data ChannelMessage
+ *
* @throws IOException if an error occurs
*/
public void setMessage(byte[] data) throws IOException {
- setMessage(data,0,data.length);
+ setMessage(data, 0, data.length);
}
- public void setMessage(byte[] data,int offset, int length) throws
IOException {
+ public void setMessage(byte[] data, int offset, int length) throws
IOException {
if (data != null) {
synchronized (this) {
current = data;
@@ -375,7 +378,7 @@ public class NioSender extends AbstractSender {
}
// TODO use ByteBuffer.wrap to avoid copying the data.
- writebuf.put(data,offset,length);
+ writebuf.put(data, offset, length);
writebuf.flip();
if (isConnected()) {
if (isUdpBased()) {
diff --git
a/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
b/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
index f73347d5ff..2c0f7ca850 100644
--- a/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
+++ b/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
@@ -52,7 +52,7 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
private final InternalState state;
- protected final long selectTimeout = 5000; //default 5 seconds, same as
send timeout
+ protected final long selectTimeout = 5000; // default 5 seconds, same as
send timeout
public ParallelNioSender() throws IOException {
state = new InternalState(Selector.open());
@@ -62,25 +62,23 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
@Override
- public synchronized void sendMessage(Member[] destination, ChannelMessage
msg)
- throws ChannelException {
+ public synchronized void sendMessage(Member[] destination, ChannelMessage
msg) throws ChannelException {
long start = System.currentTimeMillis();
- this.setUdpBased((msg.getOptions()&Channel.SEND_OPTIONS_UDP) ==
Channel.SEND_OPTIONS_UDP);
- byte[] data = XByteBuffer.createDataPackage((ChannelData)msg);
+ this.setUdpBased((msg.getOptions() & Channel.SEND_OPTIONS_UDP) ==
Channel.SEND_OPTIONS_UDP);
+ byte[] data = XByteBuffer.createDataPackage((ChannelData) msg);
NioSender[] senders = setupForSend(destination);
connect(senders);
- setData(senders,data);
+ setData(senders, data);
int remaining = senders.length;
ChannelException cx = null;
try {
- //loop until complete, an error happens, or we timeout
+ // loop until complete, an error happens, or we timeout
long delta = System.currentTimeMillis() - start;
- boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK &
- msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
- while ( (remaining>0) && (delta<getTimeout()) ) {
+ boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK &
msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
+ while ((remaining > 0) && (delta < getTimeout())) {
try {
- SendResult result = doLoop(selectTimeout,
getMaxRetryAttempts(),waitForAck,msg);
+ SendResult result = doLoop(selectTimeout,
getMaxRetryAttempts(), waitForAck, msg);
remaining -= result.getCompleted();
if (result.getFailed() != null) {
remaining -=
result.getFailed().getFaultyMembers().length;
@@ -90,13 +88,13 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
cx.addFaultyMember(result.getFailed().getFaultyMembers());
}
}
- } catch (Exception x ) {
+ } catch (Exception x) {
if (log.isTraceEnabled()) {
log.trace("Error sending message", x);
}
if (cx == null) {
- if ( x instanceof ChannelException ) {
- cx = (ChannelException)x;
+ if (x instanceof ChannelException) {
+ cx = (ChannelException) x;
} else {
cx = new
ChannelException(sm.getString("parallelNioSender.send.failed"), x);
}
@@ -110,13 +108,13 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
}
delta = System.currentTimeMillis() - start;
}
- if ( remaining > 0 ) {
- //timeout has occurred
- ChannelException cxtimeout = new ChannelException(sm.getString(
- "parallelNioSender.operation.timedout",
Long.toString(getTimeout())));
+ if (remaining > 0) {
+ // timeout has occurred
+ ChannelException cxtimeout = new ChannelException(
+ sm.getString("parallelNioSender.operation.timedout",
Long.toString(getTimeout())));
if (cx == null) {
- cx = new
ChannelException(sm.getString("parallelNioSender.operation.timedout",
- Long.toString(getTimeout())));
+ cx = new ChannelException(
+
sm.getString("parallelNioSender.operation.timedout",
Long.toString(getTimeout())));
}
for (NioSender sender : senders) {
if (!sender.isComplete()) {
@@ -124,18 +122,18 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
}
}
throw cx;
- } else if ( cx != null ) {
- //there was an error
+ } else if (cx != null) {
+ // there was an error
throw cx;
}
- } catch (Exception x ) {
+ } catch (Exception x) {
try {
this.disconnect();
} catch (Exception e) {
// Ignore
}
- if ( x instanceof ChannelException ) {
- throw (ChannelException)x;
+ if (x instanceof ChannelException) {
+ throw (ChannelException) x;
} else {
throw new ChannelException(x);
}
@@ -165,44 +163,45 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
sk.interestOps(sk.interestOps() & ~readyOps);
NioSender sender = (NioSender) sk.attachment();
try {
- if (sender.process(sk,waitForAck)) {
+ if (sender.process(sk, waitForAck)) {
sender.setComplete(true);
result.complete(sender);
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" +
- new UniqueId(msg.getUniqueId()) + " at " +
+ if (Logs.MESSAGES.isTraceEnabled()) {
+ Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" +
new UniqueId(msg.getUniqueId()) + " at " +
new
java.sql.Timestamp(System.currentTimeMillis()) + " to " +
sender.getDestination().getName());
}
SenderState.getSenderState(sender.getDestination()).setReady();
- }//end if
+ } // end if
} catch (Exception x) {
if (log.isTraceEnabled()) {
- log.trace("Error while processing send to " +
sender.getDestination().getName(),
- x);
+ log.trace("Error while processing send to " +
sender.getDestination().getName(), x);
}
SenderState state =
SenderState.getSenderState(sender.getDestination());
- int attempt = sender.getAttempt()+1;
- boolean retry = (attempt <= maxAttempts && maxAttempts>0);
+ int attempt = sender.getAttempt() + 1;
+ boolean retry = (attempt <= maxAttempts && maxAttempts > 0);
synchronized (state) {
- //sk.cancel();
+ // sk.cancel();
if (state.isSuspect()) {
state.setFailing();
}
if (state.isReady()) {
state.setSuspect();
- if ( retry ) {
-
log.warn(sm.getString("parallelNioSender.send.fail.retrying",
sender.getDestination().getName()));
+ if (retry) {
+
log.warn(sm.getString("parallelNioSender.send.fail.retrying",
+ sender.getDestination().getName()));
} else {
log.warn(sm.getString("parallelNioSender.send.fail",
sender.getDestination().getName()), x);
}
}
}
- if ( !isConnected() ) {
-
log.warn(sm.getString("parallelNioSender.sender.disconnected.notRetry",
sender.getDestination().getName()));
- ChannelException cx = new
ChannelException(sm.getString("parallelNioSender.sender.disconnected.sendFailed"),
x);
- cx.addFaultyMember(sender.getDestination(),x);
+ if (!isConnected()) {
+
log.warn(sm.getString("parallelNioSender.sender.disconnected.notRetry",
+ sender.getDestination().getName()));
+ ChannelException cx =
+ new
ChannelException(sm.getString("parallelNioSender.sender.disconnected.sendFailed"),
x);
+ cx.addFaultyMember(sender.getDestination(), x);
result.failed(cx);
break;
}
@@ -214,17 +213,15 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
sender.connect();
sender.setAttempt(attempt);
sender.setMessage(data);
- } catch (Exception ignore){
+ } catch (Exception ignore) {
state.setFailing();
}
} else {
- ChannelException cx = new ChannelException(
-
sm.getString("parallelNioSender.sendFailed.attempt",
- Integer.toString(sender.getAttempt()),
- Integer.toString(maxAttempts)), x);
- cx.addFaultyMember(sender.getDestination(),x);
+ ChannelException cx = new
ChannelException(sm.getString("parallelNioSender.sendFailed.attempt",
+ Integer.toString(sender.getAttempt()),
Integer.toString(maxAttempts)), x);
+ cx.addFaultyMember(sender.getDestination(), x);
result.failed(cx);
- }//end if
+ } // end if
}
}
return result;
@@ -233,15 +230,18 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
private static class SendResult {
private List<NioSender> completeSenders = new ArrayList<>();
private ChannelException exception = null;
+
private void complete(NioSender sender) {
if (!completeSenders.contains(sender)) {
completeSenders.add(sender);
}
}
+
private int getCompleted() {
return completeSenders.size();
}
- private void failed(ChannelException cx){
+
+ private void failed(ChannelException cx) {
if (exception == null) {
exception = cx;
}
@@ -265,7 +265,7 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
x.addFaultyMember(sender.getDestination(), io);
}
}
- if ( x != null ) {
+ if (x != null) {
throw x;
}
}
@@ -282,7 +282,7 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
x.addFaultyMember(sender.getDestination(), io);
}
}
- if ( x != null ) {
+ if (x != null) {
throw x;
}
}
@@ -291,7 +291,7 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
private NioSender[] setupForSend(Member[] destination) throws
ChannelException {
ChannelException cx = null;
NioSender[] result = new NioSender[destination.length];
- for ( int i=0; i<destination.length; i++ ) {
+ for (int i = 0; i < destination.length; i++) {
NioSender sender = state.nioSenders.get(destination[i]);
try {
@@ -305,14 +305,14 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
sender.setSelector(state.selector);
sender.setUdpBased(isUdpBased());
result[i] = sender;
- }catch ( UnknownHostException x ) {
+ } catch (UnknownHostException x) {
if (cx == null) {
cx = new
ChannelException(sm.getString("parallelNioSender.unable.setup.NioSender"), x);
}
cx.addFaultyMember(destination[i], x);
}
}
- if ( cx != null ) {
+ if (cx != null) {
throw cx;
} else {
return result;
@@ -321,12 +321,12 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
@Override
public void connect() {
- //do nothing, we connect on demand
+ // do nothing, we connect on demand
setConnected(true);
}
- private synchronized void close() throws ChannelException {
+ private synchronized void close() throws ChannelException {
ChannelException x = null;
Iterator<Map.Entry<Member,NioSender>> iter =
state.nioSenders.entrySet().iterator();
while (iter.hasNext()) {
@@ -353,9 +353,9 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
@Override
public void remove(Member member) {
- //disconnect senders
+ // disconnect senders
NioSender sender = state.nioSenders.remove(member);
- if ( sender != null ) {
+ if (sender != null) {
sender.disconnect();
}
}
@@ -375,29 +375,32 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
public synchronized boolean keepalive() {
boolean result = false;
for (Iterator<Entry<Member,NioSender>> i =
state.nioSenders.entrySet().iterator(); i.hasNext();) {
- Map.Entry<Member, NioSender> entry = i.next();
+ Map.Entry<Member,NioSender> entry = i.next();
NioSender sender = entry.getValue();
- if ( sender.keepalive() ) {
- //nioSenders.remove(entry.getKey());
+ if (sender.keepalive()) {
+ // nioSenders.remove(entry.getKey());
i.remove();
result = true;
} else {
try {
sender.read();
- }catch ( IOException x ) {
+ } catch (IOException x) {
sender.disconnect();
sender.reset();
- //nioSenders.remove(entry.getKey());
+ // nioSenders.remove(entry.getKey());
i.remove();
result = true;
- }catch ( Exception x ) {
- log.warn(sm.getString("parallelNioSender.error.keepalive",
sender),x);
+ } catch (Exception x) {
+ log.warn(sm.getString("parallelNioSender.error.keepalive",
sender), x);
}
}
}
- //clean up any cancelled keys
- if ( result ) {
- try { state.selector.selectNow(); }catch (Exception e){/*Ignore*/}
+ // clean up any cancelled keys
+ if (result) {
+ try {
+ state.selector.selectNow();
+ } catch (Exception e) {
+ /* Ignore */}
}
return result;
}
@@ -406,7 +409,7 @@ public class ParallelNioSender extends AbstractSender
implements MultiPointSende
private static class InternalState implements Runnable {
private final Selector selector;
- private final HashMap<Member, NioSender> nioSenders = new HashMap<>();
+ private final HashMap<Member,NioSender> nioSenders = new HashMap<>();
private InternalState(Selector selector) {
this.selector = selector;
diff --git
a/java/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
b/java/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
index d344022574..7ad0c8f173 100644
--- a/java/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
+++ b/java/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
@@ -33,13 +33,13 @@ public class PooledParallelSender extends PooledSender
implements PooledParallel
if (!isConnected()) {
throw new
ChannelException(sm.getString("pooledParallelSender.sender.disconnected"));
}
- ParallelNioSender sender = (ParallelNioSender)getSender();
+ ParallelNioSender sender = (ParallelNioSender) getSender();
if (sender == null) {
- ChannelException cx = new ChannelException(sm.getString(
- "pooledParallelSender.unable.retrieveSender.timeout",
- Long.toString(getMaxWait())));
+ ChannelException cx = new ChannelException(
+
sm.getString("pooledParallelSender.unable.retrieveSender.timeout",
Long.toString(getMaxWait())));
for (Member member : destination) {
- cx.addFaultyMember(member, new
NullPointerException(sm.getString("pooledParallelSender.unable.retrieveSender")));
+ cx.addFaultyMember(member,
+ new
NullPointerException(sm.getString("pooledParallelSender.unable.retrieveSender")));
}
throw cx;
} else {
@@ -62,10 +62,10 @@ public class PooledParallelSender extends PooledSender
implements PooledParallel
public DataSender getNewDataSender() {
try {
ParallelNioSender sender = new ParallelNioSender();
- transferProperties(this,sender);
+ transferProperties(this, sender);
return sender;
- } catch ( IOException x ) {
- throw new
RuntimeException(sm.getString("pooledParallelSender.unable.open"),x);
+ } catch (IOException x) {
+ throw new
RuntimeException(sm.getString("pooledParallelSender.unable.open"), x);
}
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]