Author: davsclaus Date: Tue Nov 13 08:50:08 2012 New Revision: 1408638 URL: http://svn.apache.org/viewvc?rev=1408638&view=rev Log: Removed evil singleton. Polished code.
Removed: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java?rev=1408638&r1=1408637&r2=1408638&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java Tue Nov 13 08:50:08 2012 @@ -17,7 +17,6 @@ package org.apache.camel.component.sjms; import java.util.Map; - import javax.jms.ConnectionFactory; import org.apache.camel.CamelException; @@ -26,7 +25,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.component.sjms.jms.ConnectionFactoryResource; import org.apache.camel.component.sjms.jms.ConnectionResource; import org.apache.camel.component.sjms.jms.KeyFormatStrategy; -import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory; +import org.apache.camel.component.sjms.taskmanager.TimedTaskManager; import org.apache.camel.impl.DefaultComponent; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; @@ -35,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Represents the component that manages {@link SimpleJmsEndpoint}. + * The <a href="http://camel.apache.org/sjms">Simple JMS</a> component. */ public class SjmsComponent extends DefaultComponent implements HeaderFilterStrategyAware { private static final transient Logger LOGGER = LoggerFactory.getLogger(SjmsComponent.class); @@ -46,17 +45,8 @@ public class SjmsComponent extends Defau private KeyFormatStrategy keyFormatStrategy; private Integer connectionCount = 1; private TransactionCommitStrategy transactionCommitStrategy; + private TimedTaskManager timedTaskManager; - /** - * @see - * org.apache.camel.impl.DefaultComponent#createEndpoint(java.lang.String, - * java.lang.String, java.util.Map) - * @param uri The value passed into our call to create an endpoint - * @param remaining - * @param parameters - * @return - * @throws Exception - */ @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { validateMepAndReplyTo(parameters); @@ -80,7 +70,7 @@ public class SjmsComponent extends Defau * @return String * @throws Exception */ - private String normalizeUri(String uri) throws Exception { + private static String normalizeUri(String uri) throws Exception { String tempUri = uri; String endpointName = tempUri.substring(0, tempUri.indexOf(":")); tempUri = tempUri.substring(endpointName.length()); @@ -93,7 +83,7 @@ public class SjmsComponent extends Defau } if (ObjectHelper.isEmpty(protocol)) { protocol = "queue"; - } else if (protocol.equals("queue") || protocol.equals("topic")) { + } else if (protocol != null && (protocol.equals("queue") || protocol.equals("topic"))) { tempUri = tempUri.substring(protocol.length() + 1); } else { throw new Exception("Unsupported Protocol: " + protocol); @@ -114,14 +104,14 @@ public class SjmsComponent extends Defau * @throws Exception throws a {@link CamelException} when MEP equals InOnly * and namedReplyTo is defined. */ - private void validateMepAndReplyTo(Map<String, Object> parameters) throws Exception { + private static void validateMepAndReplyTo(Map<String, Object> parameters) throws Exception { boolean namedReplyToSet = parameters.containsKey("namedReplyTo"); boolean mepSet = parameters.containsKey("exchangePattern"); if (namedReplyToSet && mepSet) { if (!parameters.get("exchangePattern").equals(ExchangePattern.InOut.toString())) { String namedReplyTo = (String)parameters.get("namedReplyTo"); ExchangePattern mep = ExchangePattern.valueOf((String)parameters.get("exchangePattern")); - throw new CamelException("Setting parameter namedReplyTo=" + namedReplyTo + " requires a MEP of type InOut. Parameter exchangePattern is set to " + mep); + throw new CamelException("Setting parameter namedReplyTo=" + namedReplyTo + " requires a MEP of type InOut. Parameter exchangePattern is set to " + mep); } } } @@ -130,9 +120,11 @@ public class SjmsComponent extends Defau protected void doStart() throws Exception { super.doStart(); - LOGGER.debug("Verify ConnectionResource"); + timedTaskManager = new TimedTaskManager(); + + LOGGER.trace("Verify ConnectionResource"); if (getConnectionResource() == null) { - LOGGER.debug("No ConnectionResource provided. Initialize the ConnectionFactoryResource."); + LOGGER.debug("No ConnectionResource provided. Initialize the ConnectionFactoryResource."); // We always use a connection pool, even for a pool of 1 ConnectionFactoryResource connections = new ConnectionFactoryResource(getConnectionCount(), getConnectionFactory()); connections.fillPool(); @@ -144,7 +136,8 @@ public class SjmsComponent extends Defau @Override protected void doStop() throws Exception { - TimedTaskManagerFactory.getInstance().cancelTasks(); + timedTaskManager.cancelTasks(); + if (getConnectionResource() != null) { if (getConnectionResource() instanceof ConnectionFactoryResource) { ((ConnectionFactoryResource)getConnectionResource()).drainPool(); @@ -156,9 +149,6 @@ public class SjmsComponent extends Defau /** * Sets the ConnectionFactory value of connectionFactory for this instance * of SjmsComponent. - * - * @param connectionFactory Sets ConnectionFactory, default is TODO add - * default */ public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; @@ -221,11 +211,16 @@ public class SjmsComponent extends Defau /** * Sets the TransactionCommitStrategy value of transactionCommitStrategy for this * instance of SjmsComponent. - * - * @param transactionCommitStrategy Sets TransactionCommitStrategy, default is TODO add - * default */ public void setTransactionCommitStrategy(TransactionCommitStrategy commitStrategy) { this.transactionCommitStrategy = commitStrategy; } + + public TimedTaskManager getTimedTaskManager() { + return timedTaskManager; + } + + public void setTimedTaskManager(TimedTaskManager timedTaskManager) { + this.timedTaskManager = timedTaskManager; + } } Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java?rev=1408638&r1=1408637&r2=1408638&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java Tue Nov 13 08:50:08 2012 @@ -33,6 +33,7 @@ import org.apache.camel.component.sjms.j import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.jms.ObjectPool; import org.apache.camel.component.sjms.jms.SessionPool; +import org.apache.camel.component.sjms.taskmanager.TimedTaskManager; import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy; import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy; import org.apache.camel.component.sjms.tx.SessionBatchTransactionSynchronization; @@ -62,9 +63,6 @@ public class SjmsConsumer extends Defaul * Creates a new MessageConsumerResources instance. * * @see org.apache.camel.component.sjms.jms.ObjectPool#createObject() - * - * @return - * @throws Exception */ @Override protected MessageConsumerResources createObject() throws Exception { @@ -81,9 +79,6 @@ public class SjmsConsumer extends Defaul * Cleans up the MessageConsumerResources. * * @see org.apache.camel.component.sjms.jms.ObjectPool#destroyObject(java.lang.Object) - * - * @param model - * @throws Exception */ @Override protected void destroyObject(MessageConsumerResources model) throws Exception { @@ -112,19 +107,12 @@ public class SjmsConsumer extends Defaul private final Session session; private final MessageConsumer messageConsumer; - /** - * @param messageProducer - */ public MessageConsumerResources(MessageConsumer messageConsumer) { super(); this.session = null; this.messageConsumer = messageConsumer; } - /** - * @param session - * @param messageProducer - */ public MessageConsumerResources(Session session, MessageConsumer messageConsumer) { super(); this.session = session; @@ -158,6 +146,11 @@ public class SjmsConsumer extends Defaul } @Override + public SjmsEndpoint getEndpoint() { + return (SjmsEndpoint) super.getEndpoint(); + } + + @Override protected void doStart() throws Exception { super.doStart(); consumers = new MessageConsumerPool(); @@ -173,24 +166,9 @@ public class SjmsConsumer extends Defaul } } - @Override - protected void doResume() throws Exception { - super.doResume(); - doStart(); - } - - @Override - protected void doSuspend() throws Exception { - doStop(); - super.doSuspend(); - } - /** * Creates a {@link MessageConsumerResources} with a dedicated * {@link Session} required for transacted and InOut consumers. - * - * @return MessageConsumerResources - * @throws Exception */ private MessageConsumerResources createConsumerWithDedicatedSession() throws Exception { Connection conn = getConnectionResource().borrowConnection(); @@ -210,9 +188,6 @@ public class SjmsConsumer extends Defaul /** * Creates a {@link MessageConsumerResources} with a shared {@link Session} * for non-transacted InOnly consumers. - * - * @return - * @throws Exception */ private MessageConsumerResources createConsumerListener() throws Exception { Session queueSession = getSessionPool().borrowObject(); @@ -233,7 +208,7 @@ public class SjmsConsumer extends Defaul * Helper factory method used to create a MessageListener based on the MEP * * @param session a session is only required if we are a transacted consumer - * @return + * @return the listener */ protected MessageListener createMessageHandler(Session session) { @@ -245,15 +220,16 @@ public class SjmsConsumer extends Defaul } else { commitStrategy = new DefaultTransactionCommitStrategy(); } - - Synchronization synchronization = null; + + Synchronization synchronization; if (commitStrategy instanceof BatchTransactionCommitStrategy) { - synchronization = new SessionBatchTransactionSynchronization(session, commitStrategy, getTransactionBatchTimeout()); + TimedTaskManager timedTaskManager = getEndpoint().getComponent().getTimedTaskManager(); + synchronization = new SessionBatchTransactionSynchronization(timedTaskManager, session, commitStrategy, getTransactionBatchTimeout()); } else { synchronization = new SessionTransactionSynchronization(session, commitStrategy); } - AbstractMessageHandler messageHandler = null; + AbstractMessageHandler messageHandler; if (getSjmsEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) { if (isTransacted()) { messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, synchronization); @@ -338,9 +314,7 @@ public class SjmsConsumer extends Defaul } /** - * Sets the JMS Message selector syntax. - * - * @param messageSelector Message selector syntax or null + * Gets the JMS Message selector syntax. */ public String getMessageSelector() { return getSjmsEndpoint().getMessageSelector(); Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java?rev=1408638&r1=1408637&r2=1408638&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java Tue Nov 13 08:50:08 2012 @@ -69,11 +69,16 @@ public class SjmsEndpoint extends Defaul } else if (getEndpointUri().indexOf("://topic:") > -1) { topic = true; } else { - throw new RuntimeCamelException("Endpoint URI unsupported: " + uri); + throw new IllegalArgumentException("Endpoint URI unsupported: " + uri); } } @Override + public SjmsComponent getComponent() { + return (SjmsComponent) super.getComponent(); + } + + @Override protected void doStart() throws Exception { super.doStart(); Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java?rev=1408638&r1=1408637&r2=1408638&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java Tue Nov 13 08:50:08 2012 @@ -28,9 +28,9 @@ import java.util.concurrent.locks.Reentr public class TimedTaskManager { private final Timer timer = new Timer(); - private ReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); - TimedTaskManager() { + public TimedTaskManager() { } public void addTask(TimerTask task, long delay) { Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java?rev=1408638&r1=1408637&r2=1408638&view=diff ============================================================================== --- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java (original) +++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java Tue Nov 13 08:50:08 2012 @@ -24,28 +24,27 @@ import javax.jms.Session; import org.apache.camel.Exchange; import org.apache.camel.component.sjms.TransactionCommitStrategy; -import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory; +import org.apache.camel.component.sjms.taskmanager.TimedTaskManager; import org.apache.camel.spi.Synchronization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * SessionTransactionSynchronization is called at the completion of each - * {@link org.apache.camel.Exhcnage}. + * {@link org.apache.camel.Exchange}. */ public class SessionBatchTransactionSynchronization implements Synchronization { - private Logger log = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(SessionBatchTransactionSynchronization.class); private Session session; private final TransactionCommitStrategy commitStrategy; private long batchTransactionTimeout = 5000; private TimeoutTask currentTask; private ReadWriteLock lock = new ReentrantReadWriteLock(); + private final TimedTaskManager timedTaskManager; - public SessionBatchTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy) { - this(session, commitStrategy, 5000); - } - - public SessionBatchTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy, long batchTransactionTimeout) { + public SessionBatchTransactionSynchronization(TimedTaskManager timedTaskManager, + Session session, TransactionCommitStrategy commitStrategy, long batchTransactionTimeout) { + this.timedTaskManager = timedTaskManager; this.session = session; if (commitStrategy == null) { this.commitStrategy = new DefaultTransactionCommitStrategy(); @@ -58,43 +57,39 @@ public class SessionBatchTransactionSync } } - /** - * @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange) - * @param exchange - */ @Override public void onFailure(Exchange exchange) { try { lock.readLock().lock(); if (commitStrategy.rollback(exchange)) { - log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing failure of Exchange id:{}", exchange.getExchangeId()); + } if (session != null && session.getTransacted()) { session.rollback(); } } } catch (Exception e) { - log.warn("Failed to rollback the session: {}", e.getMessage()); + LOG.warn("Failed to rollback the session: " + e.getMessage() + ". This exception will be ignored.", e); } finally { lock.readLock().unlock(); } } - /** - * @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange) - * @param exchange - */ @Override public void onComplete(Exchange exchange) { try { lock.readLock().lock(); if (commitStrategy.commit(exchange)) { - log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing completion of Exchange id:{}", exchange.getExchangeId()); + } if (session != null && session.getTransacted()) { session.commit(); } } } catch (Exception e) { - log.warn("Failed to commit the session: {}", e.getMessage()); + LOG.warn("Failed to commit the session: " + e.getMessage() + ". This exception will be ignored.", e); exchange.setException(e); } finally { lock.readLock().unlock(); @@ -119,17 +114,12 @@ public class SessionBatchTransactionSync } finally { lock.writeLock().unlock(); } - TimedTaskManagerFactory.getInstance().addTask(currentTask, batchTransactionTimeout); + timedTaskManager.addTask(currentTask, batchTransactionTimeout); } - public class TimeoutTask extends TimerTask { + public final class TimeoutTask extends TimerTask { - /** - * Default constructor - * - * @param str - */ - TimeoutTask() { + private TimeoutTask() { } /** @@ -137,17 +127,17 @@ public class SessionBatchTransactionSync * transaction. */ public void run() { - log.info("Batch Transaction Timer expired:"); + LOG.debug("Batch Transaction Timer expired"); try { lock.writeLock().lock(); - log.debug("Committing the current transactions"); + LOG.trace("Committing the current transactions"); try { if (session != null && session.getTransacted()) { session.commit(); } ((BatchTransactionCommitStrategy)commitStrategy).reset(); } catch (Exception e) { - log.warn("Failed to commit the session during timeout: {}", e.getMessage()); + LOG.warn("Failed to commit the session during timeout: " + e.getMessage() + ". This exception will be ignored.", e); } } finally { lock.writeLock().unlock(); @@ -156,9 +146,7 @@ public class SessionBatchTransactionSync @Override public boolean cancel() { - if (log.isTraceEnabled()) { - log.trace("Cancelling the TimeoutTask"); - } + LOG.trace("Cancelling the TimeoutTask"); return super.cancel(); } }