Author: cmueller Date: Mon Feb 21 22:35:27 2011 New Revision: 1073178 URL: http://svn.apache.org/viewvc?rev=1073178&view=rev Log: CAMEL-3656: Add support for Asynchronous smpp interaction - work in progress
Modified: camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java Modified: camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java?rev=1073178&r1=1073177&r2=1073178&view=diff ============================================================================== --- camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java (original) +++ camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java Mon Feb 21 22:35:27 2011 @@ -17,9 +17,12 @@ package org.apache.camel.component.smpp; import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.ReentrantLock; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.impl.DefaultProducer; import org.jsmpp.DefaultPDUReader; import org.jsmpp.DefaultPDUSender; @@ -47,7 +50,7 @@ import org.slf4j.LoggerFactory; * @version * @author muellerc */ -public class SmppProducer extends DefaultProducer { +public class SmppProducer extends DefaultAsyncProducer { private static final transient Logger LOG = LoggerFactory.getLogger(SmppProducer.class); @@ -114,40 +117,58 @@ public class SmppProducer extends Defaul } } - public void process(Exchange exchange) throws Exception { + public boolean process(Exchange exchange, AsyncCallback callback) { if (LOG.isDebugEnabled()) { LOG.debug("Sending a short message for exchange id '" + exchange.getExchangeId() + "'..."); } + if (!isRunAllowed()) { + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException()); + } + callback.done(true); + return true; + } + // only possible by trying to reconnect if (this.session == null) { - throw new IOException("Lost connection to " + getEndpoint().getConnectionString() + " and yet not reconnected"); + exchange.setException(new IOException("Lost connection to " + getEndpoint().getConnectionString() + " and yet not reconnected")); + callback.done(true); + return true; } - SubmitSm submitSm = getEndpoint().getBinding().createSubmitSm(exchange); - String messageId = session.submitShortMessage( - submitSm.getServiceType(), - TypeOfNumber.valueOf(submitSm.getSourceAddrTon()), - NumberingPlanIndicator.valueOf(submitSm.getSourceAddrNpi()), - submitSm.getSourceAddr(), - TypeOfNumber.valueOf(submitSm.getDestAddrTon()), - NumberingPlanIndicator.valueOf(submitSm.getDestAddrNpi()), - submitSm.getDestAddress(), - new ESMClass(), - submitSm.getProtocolId(), - submitSm.getPriorityFlag(), - submitSm.getScheduleDeliveryTime(), - submitSm.getValidityPeriod(), - new RegisteredDelivery(submitSm.getRegisteredDelivery()), - submitSm.getReplaceIfPresent(), - new GeneralDataCoding( - false, - false, - MessageClass.CLASS1, - Alphabet.valueOf(submitSm.getDataCoding())), - (byte) 0, - submitSm.getShortMessage()); + SubmitSm submitSm = null; + String messageId = null; + try { + submitSm = getEndpoint().getBinding().createSubmitSm(exchange); + messageId = session.submitShortMessage( + submitSm.getServiceType(), + TypeOfNumber.valueOf(submitSm.getSourceAddrTon()), + NumberingPlanIndicator.valueOf(submitSm.getSourceAddrNpi()), + submitSm.getSourceAddr(), + TypeOfNumber.valueOf(submitSm.getDestAddrTon()), + NumberingPlanIndicator.valueOf(submitSm.getDestAddrNpi()), + submitSm.getDestAddress(), + new ESMClass(), + submitSm.getProtocolId(), + submitSm.getPriorityFlag(), + submitSm.getScheduleDeliveryTime(), + submitSm.getValidityPeriod(), + new RegisteredDelivery(submitSm.getRegisteredDelivery()), + submitSm.getReplaceIfPresent(), + new GeneralDataCoding( + false, + false, + MessageClass.CLASS1, + Alphabet.valueOf(submitSm.getDataCoding())), + (byte) 0, + submitSm.getShortMessage()); + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; + } if (LOG.isDebugEnabled()) { LOG.debug("Sent a short message for exchange id '" @@ -166,6 +187,9 @@ public class SmppProducer extends Defaul } exchange.getIn().setHeader(SmppBinding.ID, messageId); } + + // continue routing asynchronously + return false; } @Override