Author: davsclaus Date: Thu Mar 12 07:32:52 2009 New Revision: 752791 URL: http://svn.apache.org/viewvc?rev=752791&view=rev Log: Fixed UDP being slow and not reliable in stress tests. Thanks to Orton for sample client demonstrating this.
Added: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConstants.java (with props) Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java Added: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConstants.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConstants.java?rev=752791&view=auto ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConstants.java (added) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConstants.java Thu Mar 12 07:32:52 2009 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mina; + +/** + * Mina constants + * + * @version $Revision$ + */ +public final class MinaConstants { + + public static final transient String MINA_CLOSE_SESSION_WHEN_COMPLETE = "CamelMinaCloseSessionWhenComplete"; + + private MinaConstants() { + // Utility class + } +} Propchange: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConstants.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConstants.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java?rev=752791&r1=752790&r2=752791&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java Thu Mar 12 07:32:52 2009 @@ -19,10 +19,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.ObjectInput; +import java.io.ObjectInputStream; import org.apache.camel.Converter; import org.apache.camel.Exchange; -import org.apache.camel.converter.IOConverter; import org.apache.mina.common.ByteBuffer; /** @@ -46,7 +46,9 @@ @Converter public static String toString(ByteBuffer buffer, Exchange exchange) { - return IOConverter.toString(toByteArray(buffer), exchange); + byte[] bytes = toByteArray(buffer); + // use type converter as it can handle encoding set on the Exchange + return exchange.getContext().getTypeConverter().convertTo(String.class, exchange, bytes); } @Converter @@ -56,7 +58,8 @@ @Converter public static ObjectInput toObjectInput(ByteBuffer buffer) throws IOException { - return IOConverter.toObjectInput(toInputStream(buffer)); + InputStream is = toInputStream(buffer); + return new ObjectInputStream(is); } @Converter Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java?rev=752791&r1=752790&r2=752791&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java Thu Mar 12 07:32:52 2009 @@ -18,6 +18,8 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; @@ -26,6 +28,8 @@ */ public final class MinaHelper { + private static final transient Log LOG = LogFactory.getLog(MinaHelper.class); + private MinaHelper() { //Utility Class } @@ -33,9 +37,9 @@ /** * Writes the given body to MINA session. Will wait until the body has been written. * - * @param session the MINA session - * @param body the body to write (send) - * @param exchange the mina exchange used for error reporting + * @param session the MINA session + * @param body the body to write (send) + * @param exchange the mina exchange used for error reporting * @throws CamelExchangeException is thrown if the body could not be written for some reasons * (eg remote connection is closed etc.) */ @@ -44,7 +48,8 @@ WriteFuture future = session.write(body); future.join(); if (!future.isWritten()) { - throw new CamelExchangeException("Could not write body", exchange); + LOG.warn("Cannot write body: " + body + " using session: " + session); + throw new CamelExchangeException("Cannot write body", exchange); } } Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java?rev=752791&r1=752790&r2=752791&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java Thu Mar 12 07:32:52 2009 @@ -29,6 +29,7 @@ * @version $Revision$ */ public final class MinaPayloadHelper { + private MinaPayloadHelper() { //Utility Class } Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java?rev=752791&r1=752790&r2=752791&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java Thu Mar 12 07:32:52 2009 @@ -21,6 +21,7 @@ import java.nio.charset.CharsetEncoder; import org.apache.camel.CamelContext; +import org.apache.camel.TypeConverter; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; @@ -82,16 +83,36 @@ } private ByteBuffer toByteBuffer(Object message, CharsetEncoder encoder) throws CharacterCodingException { - ByteBuffer answer; - try { - answer = context.getTypeConverter().convertTo(ByteBuffer.class, message); - } catch (NoTypeConversionAvailableException e) { - String value = context.getTypeConverter().convertTo(String.class, message); - answer = ByteBuffer.allocate(value.length()).setAutoExpand(true); + // for fast convertions try this first instead of type converter registry + String value = null; + if (message instanceof String) { + value = (String) message; + } else { + // try to lookup if there is a string converter + TypeConverter tc = context.getTypeConverterRegistry().lookup(String.class, message.getClass()); + if (tc != null) { + value = tc.convertTo(String.class, message); + } + } + + TypeConverter tc = context.getTypeConverterRegistry().lookup(ByteBuffer.class, message.getClass()); + if (tc == null && value == null) { + // use the slower converter that throws exception + try { + value = context.getTypeConverter().convertTo(String.class, message); + } catch (NoTypeConversionAvailableException e) { + // ignore + } + } + + if (value != null) { + ByteBuffer answer = ByteBuffer.allocate(value.length()).setAutoExpand(false); answer.putString(value, encoder); + return answer; } - return answer; - } + // failback if ther is a byte buffer type converter, in case there is a fallback converter as well + return context.getTypeConverter().convertTo(ByteBuffer.class, message); + } }