Author: remm Date: Thu Nov 8 10:43:00 2018 New Revision: 1846119 URL: http://svn.apache.org/viewvc?rev=1846119&view=rev Log: Refactor various operations performed in tribes using a scheduled executor. When tribes is not running standalone, it will use the executor from the Catalina Service. If running independently (like in the testsuite), the Channel will provide the executor.
Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Channel.java?rev=1846119&r1=1846118&r2=1846119&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/Channel.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/Channel.java Thu Nov 8 10:43:00 2018 @@ -18,6 +18,7 @@ package org.apache.catalina.tribes; import java.io.Serializable; import java.util.StringJoiner; +import java.util.concurrent.ScheduledExecutorService; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -374,6 +375,18 @@ public interface Channel { public void setName(String name); /** + * Return executor that can be used for utility tasks. + * @return the executor + */ + public ScheduledExecutorService getUtilityExecutor(); + + /** + * Set the executor that can be used for utility tasks. + * @param utilityExecutor the executor + */ + public void setUtilityExecutor(ScheduledExecutorService utilityExecutor); + + /** * Translates the name of an option to its integer value. Valid option names are "asynchronous" (alias "async"), * "byte_message" (alias "byte"), "multicast", "secure", "synchronized_ack" (alias "sync"), "udp", "use_ack" * @param opt The name of the option Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java?rev=1846119&r1=1846118&r2=1846119&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java Thu Nov 8 10:43:00 2018 @@ -22,6 +22,10 @@ import java.io.Serializable; import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -72,6 +76,7 @@ public class GroupChannel extends Channe * If set to true, the channel will start a local thread for the heart beat. */ protected boolean heartbeat = true; + /** * If <code>heartbeat == true</code> then how often do we want this * heartbeat to run. default is one minute @@ -79,9 +84,9 @@ public class GroupChannel extends Channe protected long heartbeatSleeptime = 5*1000;//every 5 seconds /** - * Internal heartbeat thread + * Internal heartbeat future */ - protected HeartbeatThread hbthread = null; + protected ScheduledFuture<?> heartbeatFuture = null; /** * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br> @@ -134,6 +139,11 @@ public class GroupChannel extends Channe private boolean jmxEnabled = true; /** + * Executor service. + */ + protected ScheduledExecutorService utilityExecutor = null; + + /** * the ObjectName of this channel. */ private ObjectName oname = null; @@ -446,6 +456,8 @@ public class GroupChannel extends Channe } + protected boolean ownExecutor = false; + /** * Starts the channel. * @param svc int - what service to start @@ -459,10 +471,15 @@ public class GroupChannel extends Channe // register jmx JmxRegistry jmxRegistry = JmxRegistry.getRegistry(this); if (jmxRegistry != null) this.oname = jmxRegistry.registerJmx(",component=Channel", this); + if (utilityExecutor == null) { + log.warn(sm.getString("groupChannel.warn.noUtilityExecutor")); + utilityExecutor = new ScheduledThreadPoolExecutor(1); + ownExecutor = true; + } super.start(svc); - if ( hbthread == null && heartbeat ) { - hbthread = new HeartbeatThread(this,heartbeatSleeptime); - hbthread.start(); + if (heartbeatFuture == null && heartbeat) { + heartbeatFuture = utilityExecutor.scheduleWithFixedDelay + (new HeartbeatRunnable(), heartbeatSleeptime, heartbeatSleeptime, TimeUnit.MILLISECONDS); } } @@ -474,11 +491,16 @@ public class GroupChannel extends Channe */ @Override public synchronized void stop(int svc) throws ChannelException { - if (hbthread != null) { - hbthread.stopHeartbeat(); - hbthread = null; + if (heartbeatFuture != null) { + heartbeatFuture.cancel(true); + heartbeatFuture = null; } super.stop(svc); + if (ownExecutor) { + utilityExecutor.shutdown(); + utilityExecutor = null; + ownExecutor = false; + } if (oname != null) { JmxRegistry.getRegistry(this).unregisterJmx(oname); oname = null; @@ -494,6 +516,16 @@ public class GroupChannel extends Channe else return coordinator; } + @Override + public ScheduledExecutorService getUtilityExecutor() { + return utilityExecutor; + } + + @Override + public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) { + this.utilityExecutor = utilityExecutor; + } + /** * Returns the channel receiver component * @return ChannelReceiver @@ -764,56 +796,20 @@ public class GroupChannel extends Channe } /** - * - * <p>Title: Internal heartbeat thread</p> + * <p>Title: Internal heartbeat runnable</p> * * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class * is created</p> - * - * @version 1.0 */ - public static class HeartbeatThread extends Thread { - private static final Log log = LogFactory.getLog(HeartbeatThread.class); - protected static int counter = 1; - protected static synchronized int inc() { - return counter++; - } - - protected volatile boolean doRun = true; - protected final GroupChannel channel; - protected final long sleepTime; - public HeartbeatThread(GroupChannel channel, long sleepTime) { - super(); - this.setPriority(MIN_PRIORITY); - String channelName = ""; - if (channel.getName() != null) channelName = "[" + channel.getName() + "]"; - setName("GroupChannel-Heartbeat" + channelName + "-" +inc()); - setDaemon(true); - this.channel = channel; - this.sleepTime = sleepTime; - } - public void stopHeartbeat() { - doRun = false; - interrupt(); - } - + public class HeartbeatRunnable implements Runnable { @Override public void run() { - while (doRun) { - try { - Thread.sleep(sleepTime); - channel.heartbeat(); - } catch ( InterruptedException x ) { - // Ignore. Probably triggered by a call to stopHeartbeat(). - // In the highly unlikely event it was a different trigger, - // simply ignore it and continue. - } catch ( Exception x ) { - log.error(sm.getString("groupChannel.unable.sendHeartbeat"),x); - }//catch - }//while - }//run - }//HeartbeatThread - - + try { + heartbeat(); + } catch (Exception x) { + log.error(sm.getString("groupChannel.unable.sendHeartbeat"), x); + } + } + } } Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties?rev=1846119&r1=1846118&r2=1846119&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties [UTF-8] (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties [UTF-8] Thu Nov 8 10:43:00 2018 @@ -23,4 +23,5 @@ groupChannel.sendFail.noRpcChannelReply= groupChannel.optionFlag.conflict=Interceptor option flag conflict: [{0}] groupChannel.listener.alreadyExist=Listener already exists:[{0}][{1}] groupChannel.unable.sendHeartbeat=Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again. +groupChannel.warn.noUtilityExecutor=No utility executor was set, creating one rpcChannel.replyFailed=Unable to send back reply in RpcChannel. Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java?rev=1846119&r1=1846118&r2=1846119&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java Thu Nov 8 10:43:00 2018 @@ -18,6 +18,7 @@ package org.apache.catalina.tribes.group import java.util.ArrayList; import java.util.Arrays; +import java.util.concurrent.ScheduledExecutorService; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; @@ -156,8 +157,9 @@ public class StaticMembershipInterceptor if ( (Channel.SND_RX_SEQ&svc)==Channel.SND_RX_SEQ ) super.start(Channel.SND_RX_SEQ); if ( (Channel.SND_TX_SEQ&svc)==Channel.SND_TX_SEQ ) super.start(Channel.SND_TX_SEQ); final ChannelInterceptorBase base = this; + ScheduledExecutorService executor = getChannel().getUtilityExecutor(); for (final Member member : members) { - Thread t = new Thread() { + Runnable r = new Runnable() { @Override public void run() { base.memberAdded(member); @@ -166,7 +168,7 @@ public class StaticMembershipInterceptor } } }; - t.start(); + executor.execute(r); } super.start(svc & (~Channel.SND_RX_SEQ) & (~Channel.SND_TX_SEQ)); Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java?rev=1846119&r1=1846118&r2=1846119&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java Thu Nov 8 10:43:00 2018 @@ -18,14 +18,12 @@ package org.apache.catalina.tribes.membership; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledExecutorService; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.MembershipProvider; import org.apache.catalina.tribes.MembershipService; -import org.apache.catalina.tribes.util.ExecutorFactory; public abstract class MembershipProviderBase implements MembershipProvider { @@ -33,7 +31,7 @@ public abstract class MembershipProvider protected MembershipListener membershipListener; protected MembershipService service; // The event notification executor - protected final ExecutorService executor = ExecutorFactory.newThreadPool(0, 10, 10, TimeUnit.SECONDS); + protected ScheduledExecutorService executor; @Override public void init(Properties properties) throws Exception { @@ -65,5 +63,6 @@ public abstract class MembershipProvider @Override public void setMembershipService(MembershipService service) { this.service = service; + executor = service.getChannel().getUtilityExecutor(); } } \ No newline at end of file Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1846119&r1=1846118&r2=1846119&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Thu Nov 8 10:43:00 2018 @@ -77,6 +77,16 @@ </update> </changelog> </subsection> + <subsection name="Tribes"> + <changelog> + <update> + Refactor various operations performed in tribes using a scheduled + executor. When tribes is not running standalone, it will use the + executor from the Catalina Service. If running independently, the + Channel will provide the executor. (remm) + </update> + </changelog> + </subsection> </section> <section name="Tomcat 9.0.13 (markt)" rtext="2018-11-07"> <subsection name="Catalina"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org