Author: cmueller Date: Sat Jan 1 23:32:25 2011 New Revision: 1054321 URL: http://svn.apache.org/viewvc?rev=1054321&view=rev Log: MinaConsumer and MinaProducer now use the endpoint reference from its super class and doesn'n manager its own instance variable
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=1054321&r1=1054320&r2=1054321&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Sat Jan 1 23:32:25 2011 @@ -40,7 +40,6 @@ import org.apache.mina.common.IoSession; public class MinaConsumer extends DefaultConsumer { private static final transient Log LOG = LogFactory.getLog(MinaConsumer.class); - private final MinaEndpoint endpoint; private final SocketAddress address; private final IoAcceptor acceptor; private boolean sync; @@ -48,7 +47,6 @@ public class MinaConsumer extends Defaul public MinaConsumer(final MinaEndpoint endpoint, Processor processor) { super(endpoint, processor); - this.endpoint = endpoint; this.address = endpoint.getAddress(); this.acceptor = endpoint.getAcceptor(); this.sync = endpoint.getConfiguration().isSync(); @@ -63,7 +61,7 @@ public class MinaConsumer extends Defaul } IoHandler handler = new ReceiveHandler(); - acceptor.bind(address, handler, endpoint.getAcceptorConfig()); + acceptor.bind(address, handler, getEndpoint().getAcceptorConfig()); } @Override @@ -74,6 +72,11 @@ public class MinaConsumer extends Defaul acceptor.unbind(address); super.doStop(); } + + @Override + public MinaEndpoint getEndpoint() { + return (MinaEndpoint) super.getEndpoint(); + } /** * Handles consuming messages and replying if the exchange is out capable. @@ -99,15 +102,15 @@ public class MinaConsumer extends Defaul Object in = object; if (in instanceof byte[]) { // byte arrays is not readable so convert to string - in = endpoint.getCamelContext().getTypeConverter().convertTo(String.class, in); + in = getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in); } LOG.debug("Received body: " + in); } - Exchange exchange = endpoint.createExchange(session, object); + Exchange exchange = getEndpoint().createExchange(session, object); //Set the exchange charset property for converting - if (endpoint.getConfiguration().getCharsetName() != null) { - exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(endpoint.getConfiguration().getCharsetName())); + if (getEndpoint().getConfiguration().getCharsetName() != null) { + exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(getEndpoint().getConfiguration().getCharsetName())); } try { @@ -120,13 +123,13 @@ public class MinaConsumer extends Defaul if (sync) { Object body; if (ExchangeHelper.isOutCapable(exchange)) { - body = MinaPayloadHelper.getOut(endpoint, exchange); + body = MinaPayloadHelper.getOut(getEndpoint(), exchange); } else { - body = MinaPayloadHelper.getIn(endpoint, exchange); + body = MinaPayloadHelper.getIn(getEndpoint(), exchange); } boolean failed = exchange.isFailed(); - if (failed && !endpoint.getConfiguration().isTransferExchange()) { + if (failed && !getEndpoint().getConfiguration().isTransferExchange()) { if (exchange.getException() != null) { body = exchange.getException(); } else { @@ -137,7 +140,7 @@ public class MinaConsumer extends Defaul if (body == null) { noReplyLogger.log("No payload to send as reply for exchange: " + exchange); - if (endpoint.getConfiguration().isDisconnectOnNoReply()) { + if (getEndpoint().getConfiguration().isDisconnectOnNoReply()) { // must close session if no data to write otherwise client will never receive a response // and wait forever (if not timing out) if (LOG.isDebugEnabled()) { @@ -163,7 +166,7 @@ public class MinaConsumer extends Defaul } // should we disconnect, the header can override the configuration - boolean disconnect = endpoint.getConfiguration().isDisconnect(); + boolean disconnect = getEndpoint().getConfiguration().isDisconnect(); if (close != null) { disconnect = close; } @@ -175,6 +178,4 @@ public class MinaConsumer extends Defaul } } } - -} - +} \ No newline at end of file Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=1054321&r1=1054320&r2=1054321&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sat Jan 1 23:32:25 2011 @@ -45,7 +45,6 @@ import org.apache.mina.transport.socket. public class MinaProducer extends DefaultProducer implements ServicePoolAware { private static final transient Log LOG = LogFactory.getLog(MinaProducer.class); private IoSession session; - private MinaEndpoint endpoint; private CountDownLatch latch; private boolean lazySessionCreation; private long timeout; @@ -55,12 +54,16 @@ public class MinaProducer extends Defaul public MinaProducer(MinaEndpoint endpoint) { super(endpoint); - this.endpoint = endpoint; this.lazySessionCreation = endpoint.getConfiguration().isLazySessionCreation(); this.timeout = endpoint.getConfiguration().getTimeout(); this.sync = endpoint.getConfiguration().isSync(); this.noReplyLogger = new Logger(LOG, endpoint.getConfiguration().getNoReplyLogLevel()); } + + @Override + public MinaEndpoint getEndpoint() { + return (MinaEndpoint) super.getEndpoint(); + } @Override public boolean isSingleton() { @@ -78,19 +81,19 @@ public class MinaProducer extends Defaul } // set the exchange encoding property - if (endpoint.getConfiguration().getCharsetName() != null) { - exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(endpoint.getConfiguration().getCharsetName())); + if (getEndpoint().getConfiguration().getCharsetName() != null) { + exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(getEndpoint().getConfiguration().getCharsetName())); } - Object body = MinaPayloadHelper.getIn(endpoint, exchange); + Object body = MinaPayloadHelper.getIn(getEndpoint(), exchange); if (body == null) { noReplyLogger.log("No payload to send for exchange: " + exchange); return; // exit early since nothing to write } // if textline enabled then covert to a String which must be used for textline - if (endpoint.getConfiguration().isTextline()) { - body = endpoint.getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body); + if (getEndpoint().getConfiguration().isTextline()) { + body = getEndpoint().getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body); } // if sync is true then we should also wait for a response (synchronous mode) @@ -130,7 +133,7 @@ public class MinaProducer extends Defaul throw new CamelExchangeException("Error occurred in ResponseHandler", exchange, handler.getCause()); } else if (!handler.isMessageReceived()) { // no message received - throw new CamelExchangeException("No response received from remote server: " + endpoint.getEndpointUri(), exchange); + throw new CamelExchangeException("No response received from remote server: " + getEndpoint().getEndpointUri(), exchange); } else { // set the result on either IN or OUT on the original exchange depending on its pattern if (ExchangeHelper.isOutCapable(exchange)) { @@ -150,13 +153,13 @@ public class MinaProducer extends Defaul } // should we disconnect, the header can override the configuration - boolean disconnect = endpoint.getConfiguration().isDisconnect(); + boolean disconnect = getEndpoint().getConfiguration().isDisconnect(); if (close != null) { disconnect = close; } if (disconnect) { if (LOG.isDebugEnabled()) { - LOG.debug("Closing session when complete at address: " + endpoint.getAddress()); + LOG.debug("Closing session when complete at address: " + getEndpoint().getAddress()); } session.close(); } @@ -173,7 +176,7 @@ public class MinaProducer extends Defaul @Override protected void doStop() throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("Stopping connector: " + connector + " at address: " + endpoint.getAddress()); + LOG.debug("Stopping connector: " + connector + " at address: " + getEndpoint().getAddress()); } closeConnection(); super.doStop(); @@ -196,14 +199,14 @@ public class MinaProducer extends Defaul } private void openConnection() { - SocketAddress address = endpoint.getAddress(); - connector = endpoint.getConnector(); + SocketAddress address = getEndpoint().getAddress(); + connector = getEndpoint().getConnector(); if (LOG.isDebugEnabled()) { LOG.debug("Creating connector to address: " + address + " using connector: " + connector + " timeout: " + timeout + " millis."); } - IoHandler ioHandler = new ResponseHandler(endpoint); + IoHandler ioHandler = new ResponseHandler(getEndpoint()); // connect and wait until the connection is established - ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConnectorConfig()); + ConnectFuture future = connector.connect(address, ioHandler, getEndpoint().getConnectorConfig()); future.join(); session = future.getSession(); }