Author: raulk Date: Mon Feb 4 01:44:24 2013 New Revision: 1442004 URL: http://svn.apache.org/viewvc?rev=1442004&view=rev Log: CAMEL-5974 Allow specifying the default type of TaskExecutor used by the DMLC (camel-jms)
Added: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java - copied unchanged from r1442002, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java camel/branches/camel-2.10.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java - copied unchanged from r1442002, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java Modified: camel/branches/camel-2.10.x/components/camel-jms/ (props changed) camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Propchange: camel/branches/camel-2.10.x/components/camel-jms/ ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Mon Feb 4 01:44:24 2013 @@ -0,0 +1,2 @@ +/camel/trunk:1441998,1442002 +/camel/trunk/components/camel-jms:1359013,1359197,1359226,1359265,1359341,1360031,1360525-1360527,1360581,1360583,1360719,1361116-1361117,1361126,1361477,1361919,1361973,1365013,1365061,1365616,1365917,1365939,1366272,1366321,1369457,1369490,1369671,1370254,1370783,1371570,1371967,1372244-1372245,1372259,1372329,1372354,1372485,1372833,1372845,1372863,1373374,1373402,1373442,1373453,1373691-1373692,1373722,1373751,1373763,1373785,1374110,1374133,1374290,1374332,1374388,1374530,1374539,1374582-1374584,1374639,1374709,1374926,1374949,1375019,1375027,1375439,1375458,1375460,1375900,1375904,1375936,1376088,1376110,1376140,1376375,1376377,1376870,1377243,1377256,1377272,1377395,1377400,1377402,1377407,1377413,1377608,1378476,1378731,1378825,1378827,1379320,1379324,1379354,1379709,1380157,1380232,1381089,1381137,1381182,1381196,1381552,1381898,1382039,1382416,1382423,1382433,1383220,1383824,1383826,1383858,1384122,1384226,1384316,1384369,1384447,1384609,1384799,1384815,1386495,138 6607,1386671,1387019-1387020,1387142,1387545,1387808,1388345,1388750,1388762,1389251,1389326,1389813,1389941,1390820,1390946,1391008,1391021,1391044,1391339,1391346,1391373,1391378,1391439,1391466,1391954,1391958,1392188,1392869,1393291,1393294,1393962,1394388,1394408,1394479,1394577,1394932,1394940,1394943,1394991,1395046,1395085,1395226,1395580,1395626,1395645,1395883,1395891,1398047,1398694,1398756,1398760,1398789,1398797,1398810,1399134,1399165,1399317,1399333,1399519,1399664,1399808,1400074,1400097,1400403,1400729,1400734,1400831,1400863,1400876,1401151,1401159-1401166,1401209,1401264,1401578,1401585,1402383,1402485,1402738,1403186,1403202,1403725,1403863,1403971,1403988,1403990,1404015,1404020,1404031,1404056,1404172,1404233,1404291,1404303,1404474,1404490-1404491,1404496,1404508-1404510,1404519,1404532,1404921,1404934,1405313,1405662,1405678,1405727,1405932,1406066,1406948-1406952,1407104,1407395,1407487-1407488,1407736,1407744,1407748,1407750,1407752,1407764,1407768, 1407776,1407818,1407826,1407970,1407975,1407978,1407985,1407987,1407992,1407995,1408185-1408186,1408188,1408193,1408262,1408269,1408297,1408306,1408365,1408373,1408660-1408661,1408667,1408670,1408704,1409155,1409215,1409246,1409398,1409424,1409651,1409678,1409689,1409734,1409787,1410217,1410276,1410336,1410346,1410709,1410906,1410952,1410959,1411129,1411217,1411414,1411617,1411664,1412012,1412100,1412106,1412144,1412434,1412473,1412492,1412495,1412499,1412524,1412672,1412774,1412907,1413312,1413317,1413979,1414022,1414564,1414571,1414740,1414836,1414857,1415129,1415534,1415564,1415838,1415884,1415888,1415979,1416006,1416246,1417318-1417330,1418239,1418595,1418616,1418708,1419305,1419415,1419629,1419655,1419729,1419757,1420038,1420051,1420112,1420523-1420524,1420534,1420610,1421306,1421712,1421723,1421764,1421864,1422241,1422568,1422792,1422807,1422957,1422963,1423299,1423301,1423304,1423797,1423808,1423929,1424386,1424398,1424437,1424915,1425102,1425106,1425262,1425295,14254 45,1425653,1425724,1426385,1426459,1426748,1427806,1427815,1427825,1427853,1427989,1428463,1429218,1429271,1429280,1429282,1429477,1429527,1429529,1429707,1430724,1430731,1431152,1431273,1431280,1431291,1431297,1431336,1431626,1431642,1431652,1431705,1431781,1431784,1431813,1431819-1431820,1431891,1431902,1431933,1432413,1432435,1432629,1432632,1433518-1433519,1433855,1433891,1435219,1435497,1435580,1435700,1435812,1435861,1436244,1436308,1436384,1436389,1436420,1436438,1436441,1436487,1436505,1436603,1436619,1436759,1436904,1436944,1436980,1437139,1437158,1437160,1437208,1437283,1438017,1438063,1438363,1438380,1438499-1438500,1438662,1438880,1439433,1439451,1439468,1439473,1439639,1439771,1440280,1440328,1440360,1441387,1441392,1441508,1441518,1441876,1441879,1442002 Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java?rev=1442004&r1=1442003&r2=1442004&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java (original) +++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java Mon Feb 4 01:44:24 2013 @@ -20,6 +20,7 @@ import org.apache.camel.util.concurrent. import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.jms.listener.DefaultMessageListenerContainer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * The default {@link DefaultMessageListenerContainer container} which listen for messages @@ -46,19 +47,38 @@ public class DefaultJmsMessageListenerCo /** * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified. - * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} - * with the specified bean name and using Camel's {@link org.apache.camel.spi.ExecutorServiceManager} + * <p /> + * The type of {@link TaskExecutor} will depend on the value of + * {@link JmsConfiguration#getDefaultTaskExecutorType()}. For more details, refer to the Javadoc of + * {@link DefaultTaskExecutorType}. + * <p /> + * In all cases, it uses the specified bean name and Camel's {@link org.apache.camel.spi.ExecutorServiceManager} * to resolve the thread name. - * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String) + * @see JmsConfiguration#setDefaultTaskExecutorType(DefaultTaskExecutorType) + * @see ThreadPoolTaskExecutor#setBeanName(String) */ @Override protected TaskExecutor createDefaultTaskExecutor() { String pattern = endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern(); - String beanName = getBeanName(); + String beanName = getBeanName() == null ? endpoint.getThreadName() : getBeanName(); - SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName); - answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); - return answer; + if (endpoint.getDefaultTaskExecutorType() == DefaultTaskExecutorType.ThreadPool) { + ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor(); + answer.setBeanName(beanName); + answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); + answer.setCorePoolSize(endpoint.getConcurrentConsumers()); + // Direct hand-off mode. Do not queue up tasks: assign it to a thread immediately. + // We set no upper-bound on the thread pool (no maxPoolSize) as it's already implicitly constrained by + // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only grow up to a level of concurrency as + // defined by maxConcurrentConsumers). + answer.setQueueCapacity(0); + answer.initialize(); + return answer; + } else { + SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName); + answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true)); + return answer; + } } - + } Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1442004&r1=1442003&r2=1442004&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original) +++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Mon Feb 4 01:44:24 2013 @@ -18,6 +18,7 @@ package org.apache.camel.component.jms; import java.util.Map; import java.util.concurrent.ExecutorService; + import javax.jms.ConnectionFactory; import javax.jms.ExceptionListener; import javax.jms.Session; @@ -184,7 +185,7 @@ public class JmsComponent extends Defaul public void setCacheLevelName(String cacheName) { getConfiguration().setCacheLevelName(cacheName); } - + public void setReplyToCacheLevelName(String cacheName) { getConfiguration().setReplyToCacheLevelName(cacheName); } @@ -236,7 +237,7 @@ public class JmsComponent extends Defaul public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit); } - + public void setIdleConsumerLimit(int idleConsumerLimit) { getConfiguration().setIdleConsumerLimit(idleConsumerLimit); } Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1442004&r1=1442003&r2=1442004&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original) +++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Mon Feb 4 01:44:24 2013 @@ -49,7 +49,7 @@ import org.springframework.util.ErrorHan import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName; /** - * @version + * @version */ public class JmsConfiguration implements Cloneable { @@ -137,6 +137,7 @@ public class JmsConfiguration implements private boolean allowNullBody = true; private MessageListenerContainerFactory messageListenerContainerFactory; private boolean includeSentJMSMessageID; + private DefaultTaskExecutorType defaultTaskExecutorType; public JmsConfiguration() { } @@ -386,7 +387,7 @@ public class JmsConfiguration implements case Default: return new DefaultJmsMessageListenerContainer(endpoint); case Custom: - return getCustomMessageListenerContainer(endpoint); + return getCustomMessageListenerContainer(endpoint); default: throw new IllegalArgumentException("Unknown consumer type: " + consumerType); } @@ -1313,4 +1314,16 @@ public class JmsConfiguration implements public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) { this.includeSentJMSMessageID = includeSentJMSMessageID; } + + public DefaultTaskExecutorType getDefaultTaskExecutorType() { + return defaultTaskExecutorType; + } + + /** + * Indicates what type of {@link TaskExecutor} to use by default for JMS consumers. + * Refer to the documentation of {@link DefaultTaskExecutorType} for available options. + */ + public void setDefaultTaskExecutorType(DefaultTaskExecutorType defaultTaskExecutorType) { + this.defaultTaskExecutorType = defaultTaskExecutorType; + } } Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1442004&r1=1442003&r2=1442004&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Feb 4 01:44:24 2013 @@ -62,7 +62,7 @@ import org.springframework.util.ErrorHan /** * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a> * - * @version + * @version */ @ManagedResource(description = "Managed JMS Endpoint") public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport, Service { @@ -176,24 +176,32 @@ public class JmsEndpoint extends Default listenerContainer.setPubSubDomain(pubSubDomain); // include destination name as part of thread and transaction name - String consumerName = "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]"; + String consumerName = getThreadName(); if (configuration.getTaskExecutor() != null) { if (log.isDebugEnabled()) { log.debug("Using custom TaskExecutor: {} on listener container: {}", configuration.getTaskExecutor(), listenerContainer); } setContainerTaskExecutor(listenerContainer, configuration.getTaskExecutor()); - } else { + } else if ((listenerContainer instanceof DefaultJmsMessageListenerContainer && configuration.getDefaultTaskExecutorType() == null) + || !(listenerContainer instanceof DefaultJmsMessageListenerContainer)) { + // preserve backwards compatibility if an explicit Default TaskExecutor Type was not set; + // otherwise, defer the creation of the TaskExecutor // use a cached pool as DefaultMessageListenerContainer will throttle pool sizing ExecutorService executor = getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer, consumerName); setContainerTaskExecutor(listenerContainer, executor); + } else { + // do nothing, as we're working with a DefaultJmsMessageListenerContainer with an explicit DefaultTaskExecutorType, + // so DefaultJmsMessageListenerContainer#createDefaultTaskExecutor will handle the creation + log.debug("Deferring creation of TaskExecutor for listener container: {} as per policy: {}", + listenerContainer, configuration.getDefaultTaskExecutorType()); } - + // set a default transaction name if none provided if (configuration.getTransactionName() == null) { if (listenerContainer instanceof DefaultMessageListenerContainer) { ((DefaultMessageListenerContainer) listenerContainer).setTransactionName(consumerName); - } + } } } @@ -271,6 +279,10 @@ public class JmsEndpoint extends Default return true; } + public String getThreadName() { + return "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]"; + } + // Properties // ------------------------------------------------------------------------- @@ -448,7 +460,7 @@ public class JmsEndpoint extends Default public String getCacheLevelName() { return getConfiguration().getCacheLevelName(); } - + @ManagedAttribute public String getReplyToCacheLevelName() { return getConfiguration().getReplyToCacheLevelName(); @@ -489,7 +501,7 @@ public class JmsEndpoint extends Default public LoggingLevel getErrorHandlerLoggingLevel() { return getConfiguration().getErrorHandlerLoggingLevel(); } - + @ManagedAttribute public boolean isErrorHandlerLogStackTrace() { return getConfiguration().isErrorHandlerLogStackTrace(); @@ -509,7 +521,7 @@ public class JmsEndpoint extends Default public int getIdleConsumerLimit() { return getConfiguration().getIdleConsumerLimit(); } - + public JmsOperations getJmsOperations() { return getConfiguration().getJmsOperations(); } @@ -724,7 +736,7 @@ public class JmsEndpoint extends Default public void setCacheLevelName(String cacheName) { getConfiguration().setCacheLevelName(cacheName); } - + @ManagedAttribute public void setReplyToCacheLevelName(String cacheName) { getConfiguration().setReplyToCacheLevelName(cacheName); @@ -795,7 +807,7 @@ public class JmsEndpoint extends Default public void setIdleConsumerLimit(int idleConsumerLimit) { getConfiguration().setIdleConsumerLimit(idleConsumerLimit); } - + public void setJmsOperations(JmsOperations jmsOperations) { getConfiguration().setJmsOperations(jmsOperations); } @@ -1040,7 +1052,7 @@ public class JmsEndpoint extends Default public void setAllowNullBody(boolean allowNullBody) { configuration.setAllowNullBody(allowNullBody); } - + @ManagedAttribute public boolean isIncludeSentJMSMessageID() { return configuration.isIncludeSentJMSMessageID(); @@ -1051,6 +1063,14 @@ public class JmsEndpoint extends Default configuration.setIncludeSentJMSMessageID(includeSentJMSMessageID); } + public DefaultTaskExecutorType getDefaultTaskExecutorType() { + return configuration.getDefaultTaskExecutorType(); + } + + public void setDefaultTaskExecutorType(DefaultTaskExecutorType type) { + configuration.setDefaultTaskExecutorType(type); + } + public MessageListenerContainerFactory getMessageListenerContainerFactory() { return configuration.getMessageListenerContainerFactory(); }