Author: remm
Date: Thu Nov 8 10:43:00 2018
New Revision: 1846119
URL: http://svn.apache.org/viewvc?rev=1846119&view=rev
Log:
Refactor various operations performed in tribes using a scheduled executor.
When tribes is not running standalone, it will use the executor from the
Catalina Service.
If running independently (like in the testsuite), the Channel will provide the
executor.
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java
tomcat/trunk/webapps/docs/changelog.xml
Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Channel.java?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/Channel.java Thu Nov 8
10:43:00 2018
@@ -18,6 +18,7 @@ package org.apache.catalina.tribes;
import java.io.Serializable;
import java.util.StringJoiner;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
@@ -374,6 +375,18 @@ public interface Channel {
public void setName(String name);
/**
+ * Return executor that can be used for utility tasks.
+ * @return the executor
+ */
+ public ScheduledExecutorService getUtilityExecutor();
+
+ /**
+ * Set the executor that can be used for utility tasks.
+ * @param utilityExecutor the executor
+ */
+ public void setUtilityExecutor(ScheduledExecutorService utilityExecutor);
+
+ /**
* Translates the name of an option to its integer value. Valid option
names are "asynchronous" (alias "async"),
* "byte_message" (alias "byte"), "multicast", "secure",
"synchronized_ack" (alias "sync"), "udp", "use_ack"
* @param opt The name of the option
Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java Thu
Nov 8 10:43:00 2018
@@ -22,6 +22,10 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -72,6 +76,7 @@ public class GroupChannel extends Channe
* If set to true, the channel will start a local thread for the heart
beat.
*/
protected boolean heartbeat = true;
+
/**
* If <code>heartbeat == true</code> then how often do we want this
* heartbeat to run. default is one minute
@@ -79,9 +84,9 @@ public class GroupChannel extends Channe
protected long heartbeatSleeptime = 5*1000;//every 5 seconds
/**
- * Internal heartbeat thread
+ * Internal heartbeat future
*/
- protected HeartbeatThread hbthread = null;
+ protected ScheduledFuture<?> heartbeatFuture = null;
/**
* The <code>ChannelCoordinator</code> coordinates the bottom layer
components:<br>
@@ -134,6 +139,11 @@ public class GroupChannel extends Channe
private boolean jmxEnabled = true;
/**
+ * Executor service.
+ */
+ protected ScheduledExecutorService utilityExecutor = null;
+
+ /**
* the ObjectName of this channel.
*/
private ObjectName oname = null;
@@ -446,6 +456,8 @@ public class GroupChannel extends Channe
}
+ protected boolean ownExecutor = false;
+
/**
* Starts the channel.
* @param svc int - what service to start
@@ -459,10 +471,15 @@ public class GroupChannel extends Channe
// register jmx
JmxRegistry jmxRegistry = JmxRegistry.getRegistry(this);
if (jmxRegistry != null) this.oname =
jmxRegistry.registerJmx(",component=Channel", this);
+ if (utilityExecutor == null) {
+ log.warn(sm.getString("groupChannel.warn.noUtilityExecutor"));
+ utilityExecutor = new ScheduledThreadPoolExecutor(1);
+ ownExecutor = true;
+ }
super.start(svc);
- if ( hbthread == null && heartbeat ) {
- hbthread = new HeartbeatThread(this,heartbeatSleeptime);
- hbthread.start();
+ if (heartbeatFuture == null && heartbeat) {
+ heartbeatFuture = utilityExecutor.scheduleWithFixedDelay
+ (new HeartbeatRunnable(), heartbeatSleeptime,
heartbeatSleeptime, TimeUnit.MILLISECONDS);
}
}
@@ -474,11 +491,16 @@ public class GroupChannel extends Channe
*/
@Override
public synchronized void stop(int svc) throws ChannelException {
- if (hbthread != null) {
- hbthread.stopHeartbeat();
- hbthread = null;
+ if (heartbeatFuture != null) {
+ heartbeatFuture.cancel(true);
+ heartbeatFuture = null;
}
super.stop(svc);
+ if (ownExecutor) {
+ utilityExecutor.shutdown();
+ utilityExecutor = null;
+ ownExecutor = false;
+ }
if (oname != null) {
JmxRegistry.getRegistry(this).unregisterJmx(oname);
oname = null;
@@ -494,6 +516,16 @@ public class GroupChannel extends Channe
else return coordinator;
}
+ @Override
+ public ScheduledExecutorService getUtilityExecutor() {
+ return utilityExecutor;
+ }
+
+ @Override
+ public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) {
+ this.utilityExecutor = utilityExecutor;
+ }
+
/**
* Returns the channel receiver component
* @return ChannelReceiver
@@ -764,56 +796,20 @@ public class GroupChannel extends Channe
}
/**
- *
- * <p>Title: Internal heartbeat thread</p>
+ * <p>Title: Internal heartbeat runnable</p>
*
* <p>Description: if <code>Channel.getHeartbeat()==true</code> then a
thread of this class
* is created</p>
- *
- * @version 1.0
*/
- public static class HeartbeatThread extends Thread {
- private static final Log log =
LogFactory.getLog(HeartbeatThread.class);
- protected static int counter = 1;
- protected static synchronized int inc() {
- return counter++;
- }
-
- protected volatile boolean doRun = true;
- protected final GroupChannel channel;
- protected final long sleepTime;
- public HeartbeatThread(GroupChannel channel, long sleepTime) {
- super();
- this.setPriority(MIN_PRIORITY);
- String channelName = "";
- if (channel.getName() != null) channelName = "[" +
channel.getName() + "]";
- setName("GroupChannel-Heartbeat" + channelName + "-" +inc());
- setDaemon(true);
- this.channel = channel;
- this.sleepTime = sleepTime;
- }
- public void stopHeartbeat() {
- doRun = false;
- interrupt();
- }
-
+ public class HeartbeatRunnable implements Runnable {
@Override
public void run() {
- while (doRun) {
- try {
- Thread.sleep(sleepTime);
- channel.heartbeat();
- } catch ( InterruptedException x ) {
- // Ignore. Probably triggered by a call to stopHeartbeat().
- // In the highly unlikely event it was a different trigger,
- // simply ignore it and continue.
- } catch ( Exception x ) {
-
log.error(sm.getString("groupChannel.unable.sendHeartbeat"),x);
- }//catch
- }//while
- }//run
- }//HeartbeatThread
-
-
+ try {
+ heartbeat();
+ } catch (Exception x) {
+ log.error(sm.getString("groupChannel.unable.sendHeartbeat"),
x);
+ }
+ }
+ }
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties
[UTF-8] (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties
[UTF-8] Thu Nov 8 10:43:00 2018
@@ -23,4 +23,5 @@ groupChannel.sendFail.noRpcChannelReply=
groupChannel.optionFlag.conflict=Interceptor option flag conflict: [{0}]
groupChannel.listener.alreadyExist=Listener already exists:[{0}][{1}]
groupChannel.unable.sendHeartbeat=Unable to send heartbeat through Tribes
interceptor stack. Will try to sleep again.
+groupChannel.warn.noUtilityExecutor=No utility executor was set, creating one
rpcChannel.replyFailed=Unable to send back reply in RpcChannel.
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
Thu Nov 8 10:43:00 2018
@@ -18,6 +18,7 @@ package org.apache.catalina.tribes.group
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
@@ -156,8 +157,9 @@ public class StaticMembershipInterceptor
if ( (Channel.SND_RX_SEQ&svc)==Channel.SND_RX_SEQ )
super.start(Channel.SND_RX_SEQ);
if ( (Channel.SND_TX_SEQ&svc)==Channel.SND_TX_SEQ )
super.start(Channel.SND_TX_SEQ);
final ChannelInterceptorBase base = this;
+ ScheduledExecutorService executor = getChannel().getUtilityExecutor();
for (final Member member : members) {
- Thread t = new Thread() {
+ Runnable r = new Runnable() {
@Override
public void run() {
base.memberAdded(member);
@@ -166,7 +168,7 @@ public class StaticMembershipInterceptor
}
}
};
- t.start();
+ executor.execute(r);
}
super.start(svc & (~Channel.SND_RX_SEQ) & (~Channel.SND_TX_SEQ));
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java
Thu Nov 8 10:43:00 2018
@@ -18,14 +18,12 @@
package org.apache.catalina.tribes.membership;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.MembershipProvider;
import org.apache.catalina.tribes.MembershipService;
-import org.apache.catalina.tribes.util.ExecutorFactory;
public abstract class MembershipProviderBase implements MembershipProvider {
@@ -33,7 +31,7 @@ public abstract class MembershipProvider
protected MembershipListener membershipListener;
protected MembershipService service;
// The event notification executor
- protected final ExecutorService executor =
ExecutorFactory.newThreadPool(0, 10, 10, TimeUnit.SECONDS);
+ protected ScheduledExecutorService executor;
@Override
public void init(Properties properties) throws Exception {
@@ -65,5 +63,6 @@ public abstract class MembershipProvider
@Override
public void setMembershipService(MembershipService service) {
this.service = service;
+ executor = service.getChannel().getUtilityExecutor();
}
}
\ No newline at end of file
Modified: tomcat/trunk/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Thu Nov 8 10:43:00 2018
@@ -77,6 +77,16 @@
</update>
</changelog>
</subsection>
+ <subsection name="Tribes">
+ <changelog>
+ <update>
+ Refactor various operations performed in tribes using a scheduled
+ executor. When tribes is not running standalone, it will use the
+ executor from the Catalina Service. If running independently, the
+ Channel will provide the executor. (remm)
+ </update>
+ </changelog>
+ </subsection>
</section>
<section name="Tomcat 9.0.13 (markt)" rtext="2018-11-07">
<subsection name="Catalina">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]